diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 053d4dc650..d500cf151b 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -20651,7 +20651,7 @@ SELECT NULLIF(value, '(none)') ... Collects all the input values, including nulls, into an array. - No + Yes @@ -20664,7 +20664,7 @@ SELECT NULLIF(value, '(none)') ... dimension. (The inputs must all have the same dimensionality, and cannot be empty or null.) - No + Yes @@ -21115,7 +21115,7 @@ SELECT NULLIF(value, '(none)') ... after the first is preceded by the corresponding delimiter (if it's not null). - No + Yes diff --git a/src/backend/optimizer/prep/prepagg.c b/src/backend/optimizer/prep/prepagg.c index da89b55402..374d0dda6b 100644 --- a/src/backend/optimizer/prep/prepagg.c +++ b/src/backend/optimizer/prep/prepagg.c @@ -305,10 +305,30 @@ preprocess_aggref(Aggref *aggref, PlannerInfo *root) * functions; if not, we can't serialize partial-aggregation * results. */ - else if (transinfo->aggtranstype == INTERNALOID && - (!OidIsValid(transinfo->serialfn_oid) || - !OidIsValid(transinfo->deserialfn_oid))) - root->hasNonSerialAggs = true; + else if (transinfo->aggtranstype == INTERNALOID) + { + + if (!OidIsValid(transinfo->serialfn_oid) || + !OidIsValid(transinfo->deserialfn_oid)) + root->hasNonSerialAggs = true; + + /* + * array_agg_serialize and array_agg_deserialize make use + * of the aggregate non-byval input type's send and + * receive functions. There's a chance that the type + * being aggregated has one or both of these functions + * missing. In this case we must not allow the + * aggregate's serial and deserial functions to be used. + * It would be nice not to have special case this and + * instead provide some sort of supporting function within + * the aggregate to do this, but for now, that seems like + * overkill for this one case. + */ + if ((transinfo->serialfn_oid == F_ARRAY_AGG_SERIALIZE || + transinfo->deserialfn_oid == F_ARRAY_AGG_DESERIALIZE) && + !agg_args_support_sendreceive(aggref)) + root->hasNonSerialAggs = true; + } } } agginfo->transno = transno; diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 3ef9e8ee5e..8fda0c25ae 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "access/htup_details.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_constraint.h" #include "catalog/pg_type.h" @@ -28,7 +29,7 @@ #include "rewrite/rewriteManip.h" #include "utils/builtins.h" #include "utils/lsyscache.h" - +#include "utils/syscache.h" typedef struct { @@ -1943,6 +1944,40 @@ resolve_aggregate_transtype(Oid aggfuncid, return aggtranstype; } +/* + * agg_args_support_sendreceive + * Returns true if all non-byval of aggref's arg types have send and + * receive functions. + */ +bool +agg_args_support_sendreceive(Aggref *aggref) +{ + ListCell *lc; + + foreach(lc, aggref->args) + { + HeapTuple typeTuple; + Form_pg_type pt; + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Oid type = exprType((Node *) tle->expr); + + typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type)); + if (!HeapTupleIsValid(typeTuple)) + elog(ERROR, "cache lookup failed for type %u", type); + + pt = (Form_pg_type) GETSTRUCT(typeTuple); + + if (!pt->typbyval && + (!OidIsValid(pt->typsend) || !OidIsValid(pt->typreceive))) + { + ReleaseSysCache(typeTuple); + return false; + } + ReleaseSysCache(typeTuple); + } + return true; +} + /* * Create an expression tree for the transition function of an aggregate. * This is needed so that polymorphic functions can be used within an diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c index ca70590d7d..10b8764e5f 100644 --- a/src/backend/utils/adt/array_userfuncs.c +++ b/src/backend/utils/adt/array_userfuncs.c @@ -13,12 +13,33 @@ #include "postgres.h" #include "catalog/pg_type.h" +#include "libpq/pqformat.h" #include "common/int.h" +#include "port/pg_bitutils.h" #include "utils/array.h" +#include "utils/datum.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/typcache.h" +/* + * SerialIOData + * Used for caching element-type data in array_agg_serialize + */ +typedef struct SerialIOData +{ + FmgrInfo typsend; +} SerialIOData; + +/* + * DeserialIOData + * Used for caching element-type data in array_agg_deserialize + */ +typedef struct DeserialIOData +{ + FmgrInfo typreceive; + Oid typioparam; +} DeserialIOData; static Datum array_position_common(FunctionCallInfo fcinfo); @@ -499,6 +520,318 @@ array_agg_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +Datum +array_agg_combine(PG_FUNCTION_ARGS) +{ + ArrayBuildState *state1; + ArrayBuildState *state2; + MemoryContext agg_context; + MemoryContext old_context; + int i; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + { + /* + * NULL state2 is easy, just return state1, which we know is already + * in the agg_context + */ + if (state1 == NULL) + PG_RETURN_NULL(); + PG_RETURN_POINTER(state1); + } + + if (state1 == NULL) + { + /* We must copy state2's data into the agg_context */ + state1 = initArrayResultWithSize(state2->element_type, agg_context, + false, state2->alen); + + old_context = MemoryContextSwitchTo(agg_context); + + for (i = 0; i < state2->nelems; i++) + { + if (!state2->dnulls[i]) + state1->dvalues[i] = datumCopy(state2->dvalues[i], + state1->typbyval, + state1->typlen); + else + state1->dvalues[i] = (Datum) 0; + } + + MemoryContextSwitchTo(old_context); + + memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems); + + state1->nelems = state2->nelems; + + PG_RETURN_POINTER(state1); + } + else if (state2->nelems > 0) + { + /* We only need to combine the two states if state2 has any elements */ + int reqsize = state1->nelems + state2->nelems; + MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext); + + Assert(state1->element_type == state2->element_type); + + /* Enlarge state1 arrays if needed */ + if (state1->alen < reqsize) + { + /* Use a power of 2 size rather than allocating just reqsize */ + state1->alen = pg_nextpower2_32(reqsize); + state1->dvalues = (Datum *) repalloc(state1->dvalues, + state1->alen * sizeof(Datum)); + state1->dnulls = (bool *) repalloc(state1->dnulls, + state1->alen * sizeof(bool)); + } + + /* Copy in the state2 elements to the end of the state1 arrays */ + for (i = 0; i < state2->nelems; i++) + { + if (!state2->dnulls[i]) + state1->dvalues[i + state1->nelems] = + datumCopy(state2->dvalues[i], + state1->typbyval, + state1->typlen); + else + state1->dvalues[i + state1->nelems] = (Datum) 0; + } + + memcpy(&state1->dnulls[state1->nelems], state2->dnulls, + sizeof(bool) * state2->nelems); + + state1->nelems = reqsize; + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); +} + +/* + * array_agg_serialize + * Serialize ArrayBuildState into bytea. + */ +Datum +array_agg_serialize(PG_FUNCTION_ARGS) +{ + ArrayBuildState *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildState *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + */ + pq_sendint32(&buf, state->element_type); + + /* + * nelems -- send first so we know how large to make the dvalues and + * dnulls array during deserialization. + */ + pq_sendint64(&buf, state->nelems); + + /* alen can be decided during deserialization */ + + /* typlen */ + pq_sendint16(&buf, state->typlen); + + /* typbyval */ + pq_sendbyte(&buf, state->typbyval); + + /* typalign */ + pq_sendbyte(&buf, state->typalign); + + /* dnulls */ + pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems); + + /* + * dvalues. By agreement with array_agg_deserialize, when the element + * type is byval, we just transmit the Datum array as-is, including any + * null elements. For by-ref types, we must invoke the element type's + * send function, and we skip null elements (which is why the nulls flags + * must be sent first). + */ + if (state->typbyval) + pq_sendbytes(&buf, (char *) state->dvalues, + sizeof(Datum) * state->nelems); + else + { + SerialIOData *iodata; + int i; + + /* Avoid repeat catalog lookups for typsend function */ + iodata = (SerialIOData *) fcinfo->flinfo->fn_extra; + if (iodata == NULL) + { + Oid typsend; + bool typisvarlena; + + iodata = (SerialIOData *) + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(SerialIOData)); + getTypeBinaryOutputInfo(state->element_type, &typsend, + &typisvarlena); + fmgr_info_cxt(typsend, &iodata->typsend, + fcinfo->flinfo->fn_mcxt); + fcinfo->flinfo->fn_extra = (void *) iodata; + } + + for (i = 0; i < state->nelems; i++) + { + bytea *outputbytes; + + if (state->dnulls[i]) + continue; + outputbytes = SendFunctionCall(&iodata->typsend, + state->dvalues[i]); + pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ); + pq_sendbytes(&buf, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +array_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + ArrayBuildState *result; + StringInfoData buf; + Oid element_type; + int64 nelems; + const char *temp; + + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* nelems */ + nelems = pq_getmsgint64(&buf); + + /* Create output ArrayBuildState with the needed number of elements */ + result = initArrayResultWithSize(element_type, CurrentMemoryContext, + false, nelems); + result->nelems = nelems; + + /* typlen */ + result->typlen = pq_getmsgint(&buf, 2); + + /* typbyval */ + result->typbyval = pq_getmsgbyte(&buf); + + /* typalign */ + result->typalign = pq_getmsgbyte(&buf); + + /* dnulls */ + temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems); + memcpy(result->dnulls, temp, sizeof(bool) * nelems); + + /* dvalues --- see comment in array_agg_serialize */ + if (result->typbyval) + { + temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems); + memcpy(result->dvalues, temp, sizeof(Datum) * nelems); + } + else + { + DeserialIOData *iodata; + int i; + + /* Avoid repeat catalog lookups for typreceive function */ + iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra; + if (iodata == NULL) + { + Oid typreceive; + + iodata = (DeserialIOData *) + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(DeserialIOData)); + getTypeBinaryInputInfo(element_type, &typreceive, + &iodata->typioparam); + fmgr_info_cxt(typreceive, &iodata->typreceive, + fcinfo->flinfo->fn_mcxt); + fcinfo->flinfo->fn_extra = (void *) iodata; + } + + for (i = 0; i < nelems; i++) + { + int itemlen; + StringInfoData elem_buf; + char csave; + + if (result->dnulls[i]) + { + result->dvalues[i] = (Datum) 0; + continue; + } + + itemlen = pq_getmsgint(&buf, 4); + if (itemlen < 0 || itemlen > (buf.len - buf.cursor)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("insufficient data left in message"))); + + /* + * Rather than copying data around, we just set up a phony + * StringInfo pointing to the correct portion of the input buffer. + * We assume we can scribble on the input buffer so as to maintain + * the convention that StringInfos have a trailing null. + */ + elem_buf.data = &buf.data[buf.cursor]; + elem_buf.maxlen = itemlen + 1; + elem_buf.len = itemlen; + elem_buf.cursor = 0; + + buf.cursor += itemlen; + + csave = buf.data[buf.cursor]; + buf.data[buf.cursor] = '\0'; + + /* Now call the element's receiveproc */ + result->dvalues[i] = ReceiveFunctionCall(&iodata->typreceive, + &elem_buf, + iodata->typioparam, + -1); + + buf.data[buf.cursor] = csave; + } + } + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum array_agg_finalfn(PG_FUNCTION_ARGS) { @@ -578,6 +911,299 @@ array_agg_array_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +Datum +array_agg_array_combine(PG_FUNCTION_ARGS) +{ + ArrayBuildStateArr *state1; + ArrayBuildStateArr *state2; + MemoryContext agg_context; + MemoryContext old_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + { + /* + * NULL state2 is easy, just return state1, which we know is already + * in the agg_context + */ + if (state1 == NULL) + PG_RETURN_NULL(); + PG_RETURN_POINTER(state1); + } + + if (state1 == NULL) + { + /* We must copy state2's data into the agg_context */ + old_context = MemoryContextSwitchTo(agg_context); + + state1 = initArrayResultArr(state2->array_type, InvalidOid, + agg_context, false); + + state1->abytes = state2->abytes; + state1->data = (char *) palloc(state1->abytes); + + if (state2->nullbitmap) + { + int size = (state2->aitems + 7) / 8; + + state1->nullbitmap = (bits8 *) palloc(size); + memcpy(state1->nullbitmap, state2->nullbitmap, size); + } + + memcpy(state1->data, state2->data, state2->nbytes); + state1->nbytes = state2->nbytes; + state1->aitems = state2->aitems; + state1->nitems = state2->nitems; + state1->ndims = state2->ndims; + memcpy(state1->dims, state2->dims, sizeof(state2->dims)); + memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs)); + state1->array_type = state2->array_type; + state1->element_type = state2->element_type; + + MemoryContextSwitchTo(old_context); + + PG_RETURN_POINTER(state1); + } + + /* We only need to combine the two states if state2 has any items */ + else if (state2->nitems > 0) + { + MemoryContext oldContext; + int reqsize = state1->nbytes + state2->nbytes; + int i; + + /* + * Check the states are compatible with each other. Ensure we use the + * same error messages that are listed in accumArrayResultArr so that + * the same error is shown as would have been if we'd not used the + * combine function for the aggregation. + */ + if (state1->ndims != state2->ndims) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + + /* Check dimensions match ignoring the first dimension. */ + for (i = 1; i < state1->ndims; i++) + { + if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i]) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + } + + + oldContext = MemoryContextSwitchTo(state1->mcontext); + + /* + * If there's not enough space in state1 then we'll need to reallocate + * more. + */ + if (state1->abytes < reqsize) + { + /* use a power of 2 size rather than allocating just reqsize */ + state1->abytes = pg_nextpower2_32(reqsize); + state1->data = (char *) repalloc(state1->data, state1->abytes); + } + + if (state2->nullbitmap) + { + int newnitems = state1->nitems + state2->nitems; + + if (state1->nullbitmap == NULL) + { + /* + * First input with nulls; we must retrospectively handle any + * previous inputs by marking all their items non-null. + */ + state1->aitems = pg_nextpower2_32(Max(256, newnitems + 1)); + state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8); + array_bitmap_copy(state1->nullbitmap, 0, + NULL, 0, + state1->nitems); + } + else if (newnitems > state1->aitems) + { + int newaitems = state1->aitems + state2->aitems; + + state1->aitems = pg_nextpower2_32(newaitems); + state1->nullbitmap = (bits8 *) + repalloc(state1->nullbitmap, (state1->aitems + 7) / 8); + } + array_bitmap_copy(state1->nullbitmap, state1->nitems, + state2->nullbitmap, 0, + state2->nitems); + } + + memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes); + state1->nbytes += state2->nbytes; + state1->nitems += state2->nitems; + + state1->dims[0] += state2->dims[0]; + /* remaing dims already match, per test above */ + + Assert(state1->array_type == state2->array_type); + Assert(state1->element_type == state2->element_type); + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); +} + +/* + * array_agg_array_serialize + * Serialize ArrayBuildStateArr into bytea. + */ +Datum +array_agg_array_serialize(PG_FUNCTION_ARGS) +{ + ArrayBuildStateArr *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + * so that we can init the new state sooner. + */ + pq_sendint32(&buf, state->element_type); + + /* array_type */ + pq_sendint32(&buf, state->array_type); + + /* nbytes */ + pq_sendint32(&buf, state->nbytes); + + /* data */ + pq_sendbytes(&buf, state->data, state->nbytes); + + /* abytes */ + pq_sendint32(&buf, state->abytes); + + /* aitems */ + pq_sendint32(&buf, state->aitems); + + /* nullbitmap */ + if (state->nullbitmap) + { + Assert(state->aitems > 0); + pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8); + } + + /* nitems */ + pq_sendint32(&buf, state->nitems); + + /* ndims */ + pq_sendint32(&buf, state->ndims); + + /* dims: XXX should we just send ndims elements? */ + pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims)); + + /* lbs */ + pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs)); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +array_agg_array_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + ArrayBuildStateArr *result; + StringInfoData buf; + Oid element_type; + Oid array_type; + int nbytes; + const char *temp; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* array_type */ + array_type = pq_getmsgint(&buf, 4); + + /* nbytes */ + nbytes = pq_getmsgint(&buf, 4); + + result = initArrayResultArr(array_type, element_type, + CurrentMemoryContext, false); + + result->abytes = 1024; + while (result->abytes < nbytes) + result->abytes *= 2; + + result->data = (char *) palloc(result->abytes); + + /* data */ + temp = pq_getmsgbytes(&buf, nbytes); + memcpy(result->data, temp, nbytes); + result->nbytes = nbytes; + + /* abytes */ + result->abytes = pq_getmsgint(&buf, 4); + + /* aitems: might be 0 */ + result->aitems = pq_getmsgint(&buf, 4); + + /* nullbitmap */ + if (result->aitems > 0) + { + int size = (result->aitems + 7) / 8; + + result->nullbitmap = (bits8 *) palloc(size); + temp = pq_getmsgbytes(&buf, size); + memcpy(result->nullbitmap, temp, size); + } + else + result->nullbitmap = NULL; + + /* nitems */ + result->nitems = pq_getmsgint(&buf, 4); + + /* ndims */ + result->ndims = pq_getmsgint(&buf, 4); + + /* dims */ + temp = pq_getmsgbytes(&buf, sizeof(result->dims)); + memcpy(result->dims, temp, sizeof(result->dims)); + + /* lbs */ + temp = pq_getmsgbytes(&buf, sizeof(result->lbs)); + memcpy(result->lbs, temp, sizeof(result->lbs)); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum array_agg_array_finalfn(PG_FUNCTION_ARGS) { diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c index 495e449a9e..dc3f634b48 100644 --- a/src/backend/utils/adt/arrayfuncs.c +++ b/src/backend/utils/adt/arrayfuncs.c @@ -5236,6 +5236,24 @@ array_insert_slice(ArrayType *destArray, */ ArrayBuildState * initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) +{ + /* + * When using a subcontext, we can afford to start with a somewhat larger + * initial array size. Without subcontexts, we'd better hope that most of + * the states stay small ... + */ + return initArrayResultWithSize(element_type, rcontext, subcontext, + subcontext ? 64 : 8); +} + +/* + * initArrayResultWithSize + * As initArrayResult, but allow the initial size of the allocated arrays + * to be specified. + */ +ArrayBuildState * +initArrayResultWithSize(Oid element_type, MemoryContext rcontext, + bool subcontext, int initsize) { ArrayBuildState *astate; MemoryContext arr_context = rcontext; @@ -5250,7 +5268,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) MemoryContextAlloc(arr_context, sizeof(ArrayBuildState)); astate->mcontext = arr_context; astate->private_cxt = subcontext; - astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */ + astate->alen = initsize; astate->dvalues = (Datum *) MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum)); astate->dnulls = (bool *) diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index 919138eaf3..96ccd90bd6 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -503,29 +503,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); - /* Append the value unless null. */ + /* Append the value unless null, preceding it with the delimiter. */ if (!PG_ARGISNULL(1)) { bytea *value = PG_GETARG_BYTEA_PP(1); + bool isfirst = false; - /* On the first time through, we ignore the delimiter. */ + /* + * You might think we can just throw away the first delimiter, however + * we must keep it as we may be a parallel worker doing partial + * aggregation building a state to send to the main process. We need + * to keep the delimiter of every aggregation so that the combine + * function can properly join up the strings of two separately + * partially aggregated results. The first delimiter is only stripped + * off in the final function. To know how much to strip off the front + * of the string, we store the length of the first delimiter in the + * StringInfo's cursor field, which we don't otherwise need here. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) + isfirst = true; + } + + if (!PG_ARGISNULL(2)) { bytea *delim = PG_GETARG_BYTEA_PP(2); - appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim)); + appendBinaryStringInfo(state, VARDATA_ANY(delim), + VARSIZE_ANY_EXHDR(delim)); + if (isfirst) + state->cursor = VARSIZE_ANY_EXHDR(delim); } - appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value)); + appendBinaryStringInfo(state, VARDATA_ANY(value), + VARSIZE_ANY_EXHDR(value)); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ - PG_RETURN_POINTER(state); + if (state) + PG_RETURN_POINTER(state); + PG_RETURN_NULL(); } Datum @@ -540,11 +561,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS) if (state != NULL) { + /* As per comment in transfn, strip data before the cursor position */ bytea *result; + int strippedlen = state->len - state->cursor; - result = (bytea *) palloc(state->len + VARHDRSZ); - SET_VARSIZE(result, state->len + VARHDRSZ); - memcpy(VARDATA(result), state->data, state->len); + result = (bytea *) palloc(strippedlen + VARHDRSZ); + SET_VARSIZE(result, strippedlen + VARHDRSZ); + memcpy(VARDATA(result), &state->data[state->cursor], strippedlen); PG_RETURN_BYTEA_P(result); } else @@ -5373,23 +5396,171 @@ string_agg_transfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); - /* Append the value unless null. */ + /* Append the value unless null, preceding it with the delimiter. */ if (!PG_ARGISNULL(1)) { - /* On the first time through, we ignore the delimiter. */ + text *value = PG_GETARG_TEXT_PP(1); + bool isfirst = false; + + /* + * You might think we can just throw away the first delimiter, however + * we must keep it as we may be a parallel worker doing partial + * aggregation building a state to send to the main process. We need + * to keep the delimiter of every aggregation so that the combine + * function can properly join up the strings of two separately + * partially aggregated results. The first delimiter is only stripped + * off in the final function. To know how much to strip off the front + * of the string, we store the length of the first delimiter in the + * StringInfo's cursor field, which we don't otherwise need here. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) - appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */ + isfirst = true; + } - appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */ + if (!PG_ARGISNULL(2)) + { + text *delim = PG_GETARG_TEXT_PP(2); + + appendStringInfoText(state, delim); + if (isfirst) + state->cursor = VARSIZE_ANY_EXHDR(delim); + } + + appendStringInfoText(state, value); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ - PG_RETURN_POINTER(state); + if (state) + PG_RETURN_POINTER(state); + PG_RETURN_NULL(); +} + +/* + * string_agg_combine + * Aggregate combine function for string_agg(text) and string_agg(bytea) + */ +Datum +string_agg_combine(PG_FUNCTION_ARGS) +{ + StringInfo state1; + StringInfo state2; + MemoryContext agg_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1); + + if (state2 == NULL) + { + /* + * NULL state2 is easy, just return state1, which we know is already + * in the agg_context + */ + if (state1 == NULL) + PG_RETURN_NULL(); + PG_RETURN_POINTER(state1); + } + + if (state1 == NULL) + { + /* We must copy state2's data into the agg_context */ + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(agg_context); + state1 = makeStringAggState(fcinfo); + appendBinaryStringInfo(state1, state2->data, state2->len); + state1->cursor = state2->cursor; + MemoryContextSwitchTo(old_context); + } + else if (state2->len > 0) + { + /* Combine ... state1->cursor does not change in this case */ + appendBinaryStringInfo(state1, state2->data, state2->len); + } + + PG_RETURN_POINTER(state1); +} + +/* + * string_agg_serialize + * Aggregate serialize function for string_agg(text) and string_agg(bytea) + * + * This is strict, so we need not handle NULL input + */ +Datum +string_agg_serialize(PG_FUNCTION_ARGS) +{ + StringInfo state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (StringInfo) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* cursor */ + pq_sendint(&buf, state->cursor, 4); + + /* data */ + pq_sendbytes(&buf, state->data, state->len); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +/* + * string_agg_deserialize + * Aggregate deserial function for string_agg(text) and string_agg(bytea) + * + * This is strict, so we need not handle NULL input + */ +Datum +string_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + StringInfo result; + StringInfoData buf; + char *data; + int datalen; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeStringAggState(fcinfo); + + /* cursor */ + result->cursor = pq_getmsgint(&buf, 4); + + /* data */ + datalen = VARSIZE_ANY_EXHDR(sstate) - 4; + data = (char *) pq_getmsgbytes(&buf, datalen); + appendBinaryStringInfo(result, data, datalen); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); } Datum @@ -5403,7 +5574,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); if (state != NULL) - PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len)); + { + /* As per comment in transfn, strip data before the cursor position */ + PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor], + state->len - state->cursor)); + } else PG_RETURN_NULL(); } diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat index 86cc650798..23c933749b 100644 --- a/src/include/catalog/pg_aggregate.dat +++ b/src/include/catalog/pg_aggregate.dat @@ -537,19 +537,28 @@ # array { aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn', - aggfinalfn => 'array_agg_finalfn', aggfinalextra => 't', - aggtranstype => 'internal' }, + aggcombinefn => 'array_agg_combine', aggserialfn => 'array_agg_serialize', + aggdeserialfn => 'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn', + aggfinalextra => 't', aggtranstype => 'internal' }, { aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn', + aggcombinefn => 'array_agg_array_combine', + aggserialfn => 'array_agg_array_serialize', + aggdeserialfn => 'array_agg_array_deserialize', aggfinalfn => 'array_agg_array_finalfn', aggfinalextra => 't', aggtranstype => 'internal' }, # text { aggfnoid => 'string_agg(text,text)', aggtransfn => 'string_agg_transfn', + aggcombinefn => 'string_agg_combine', aggserialfn => 'string_agg_serialize', + aggdeserialfn => 'string_agg_deserialize', aggfinalfn => 'string_agg_finalfn', aggtranstype => 'internal' }, # bytea { aggfnoid => 'string_agg(bytea,bytea)', aggtransfn => 'bytea_string_agg_transfn', + aggcombinefn => 'string_agg_combine', + aggserialfn => 'string_agg_serialize', + aggdeserialfn => 'string_agg_deserialize', aggfinalfn => 'bytea_string_agg_finalfn', aggtranstype => 'internal' }, # range diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index be47583122..81ff5153d8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -1643,6 +1643,15 @@ { oid => '2333', descr => 'aggregate transition function', proname => 'array_agg_transfn', proisstrict => 'f', prorettype => 'internal', proargtypes => 'internal anynonarray', prosrc => 'array_agg_transfn' }, +{ oid => '9328', descr => 'aggregate combine function', + proname => 'array_agg_combine', proisstrict => 'f', prorettype => 'internal', + proargtypes => 'internal internal', prosrc => 'array_agg_combine' }, +{ oid => '9329', descr => 'aggregate serial function', + proname => 'array_agg_serialize', prorettype => 'bytea', + proargtypes => 'internal', prosrc => 'array_agg_serialize' }, +{ oid => '9330', descr => 'aggregate deserial function', + proname => 'array_agg_deserialize', prorettype => 'internal', + proargtypes => 'bytea internal', prosrc => 'array_agg_deserialize' }, { oid => '2334', descr => 'aggregate final function', proname => 'array_agg_finalfn', proisstrict => 'f', prorettype => 'anyarray', proargtypes => 'internal anynonarray', prosrc => 'array_agg_finalfn' }, @@ -1654,6 +1663,15 @@ proname => 'array_agg_array_transfn', proisstrict => 'f', prorettype => 'internal', proargtypes => 'internal anyarray', prosrc => 'array_agg_array_transfn' }, +{ oid => '9331', descr => 'aggregate combine function', + proname => 'array_agg_array_combine', proisstrict => 'f', prorettype => 'internal', + proargtypes => 'internal internal', prosrc => 'array_agg_array_combine' }, +{ oid => '9332', descr => 'aggregate serial function', + proname => 'array_agg_array_serialize', prorettype => 'bytea', + proargtypes => 'internal', prosrc => 'array_agg_array_serialize' }, +{ oid => '9333', descr => 'aggregate deserial function', + proname => 'array_agg_array_deserialize', prorettype => 'internal', + proargtypes => 'bytea internal', prosrc => 'array_agg_array_deserialize' }, { oid => '4052', descr => 'aggregate final function', proname => 'array_agg_array_finalfn', proisstrict => 'f', prorettype => 'anyarray', proargtypes => 'internal anyarray', @@ -4922,6 +4940,15 @@ { oid => '3535', descr => 'aggregate transition function', proname => 'string_agg_transfn', proisstrict => 'f', prorettype => 'internal', proargtypes => 'internal text text', prosrc => 'string_agg_transfn' }, +{ oid => '9334', descr => 'aggregate combine function', + proname => 'string_agg_combine', proisstrict => 'f', prorettype => 'internal', + proargtypes => 'internal internal', prosrc => 'string_agg_combine' }, +{ oid => '9335', descr => 'aggregate serial function', + proname => 'string_agg_serialize', prorettype => 'bytea', + proargtypes => 'internal', prosrc => 'string_agg_serialize' }, +{ oid => '9336', descr => 'aggregate deserial function', + proname => 'string_agg_deserialize', prorettype => 'internal', + proargtypes => 'bytea internal', prosrc => 'string_agg_deserialize' }, { oid => '3536', descr => 'aggregate final function', proname => 'string_agg_finalfn', proisstrict => 'f', prorettype => 'text', proargtypes => 'internal', prosrc => 'string_agg_finalfn' }, diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h index c56822f645..f7af0c21a7 100644 --- a/src/include/parser/parse_agg.h +++ b/src/include/parser/parse_agg.h @@ -35,6 +35,8 @@ extern Oid resolve_aggregate_transtype(Oid aggfuncid, Oid *inputTypes, int numArguments); +extern bool agg_args_support_sendreceive(Aggref *aggref); + extern void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, diff --git a/src/include/utils/array.h b/src/include/utils/array.h index 2f794d1168..ed82242bd8 100644 --- a/src/include/utils/array.h +++ b/src/include/utils/array.h @@ -409,6 +409,9 @@ extern bool array_contains_nulls(ArrayType *array); extern ArrayBuildState *initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext); +extern ArrayBuildState *initArrayResultWithSize(Oid element_type, + MemoryContext rcontext, + bool subcontext, int initsize); extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate, Datum dvalue, bool disnull, Oid element_type, diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index b2198724e3..d92ab0f7dc 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -1831,6 +1831,104 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table; (1 row) drop table bytea_test_table; +-- Test parallel string_agg and array_agg +create table pagg_test (x int, y int); +insert into pagg_test +select (case x % 4 when 1 then null else x end), x % 10 +from generate_series(1,5000) x; +set parallel_setup_cost TO 0; +set parallel_tuple_cost TO 0; +set parallel_leader_participation TO 0; +set min_parallel_table_scan_size = 0; +set bytea_output = 'escape'; +-- create a view as we otherwise have to repeat this query a few times. +create view v_pagg_test AS +select + y, + min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct, + min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct, + min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct, + min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct +from ( + select + y, + unnest(regexp_split_to_array(a1.t, ','))::int AS t, + unnest(regexp_split_to_array(a1.b::text, ',')) AS b, + unnest(a1.a) AS a, + unnest(a1.aa) AS aa + from ( + select + y, + string_agg(x::text, ',') AS t, + string_agg(x::text::bytea, ',') AS b, + array_agg(x) AS a, + array_agg(ARRAY[x]) AS aa + from pagg_test + group by y + ) a1 +) a2 +group by y; +-- Ensure results are correct. +select * from v_pagg_test order by y; + y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct +---+------+------+------------+------+------+------------+------+------+------------+-------+-------+------------- + 0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500 + 1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250 + 2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500 + 3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250 + 4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500 + 5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250 + 6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500 + 7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250 + 8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500 + 9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250 +(10 rows) + +-- Ensure parallel aggregation is actually being used. +explain (costs off) select * from v_pagg_test order by y; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Group Key: pagg_test.y + -> Sort + Sort Key: pagg_test.y, (((unnest(regexp_split_to_array((string_agg((pagg_test.x)::text, ','::text)), ','::text))))::integer) + -> Result + -> ProjectSet + -> Finalize HashAggregate + Group Key: pagg_test.y + -> Gather + Workers Planned: 2 + -> Partial HashAggregate + Group Key: pagg_test.y + -> Parallel Seq Scan on pagg_test +(13 rows) + +set max_parallel_workers_per_gather = 0; +-- Ensure results are the same without parallel aggregation. +select * from v_pagg_test order by y; + y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct +---+------+------+------------+------+------+------------+------+------+------------+-------+-------+------------- + 0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500 + 1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250 + 2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500 + 3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250 + 4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500 + 5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250 + 6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500 + 7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250 + 8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500 + 9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250 +(10 rows) + +-- Clean up +reset max_parallel_workers_per_gather; +reset bytea_output; +reset min_parallel_table_scan_size; +reset parallel_leader_participation; +reset parallel_tuple_cost; +reset parallel_setup_cost; +drop view v_pagg_test; +drop table pagg_test; -- FILTER tests select min(unique1) filter (where unique1 > 100) from tenk1; min diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index 4540a06f45..bdbea3cbd9 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -700,6 +700,68 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table; drop table bytea_test_table; +-- Test parallel string_agg and array_agg +create table pagg_test (x int, y int); +insert into pagg_test +select (case x % 4 when 1 then null else x end), x % 10 +from generate_series(1,5000) x; + +set parallel_setup_cost TO 0; +set parallel_tuple_cost TO 0; +set parallel_leader_participation TO 0; +set min_parallel_table_scan_size = 0; +set bytea_output = 'escape'; + +-- create a view as we otherwise have to repeat this query a few times. +create view v_pagg_test AS +select + y, + min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct, + min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct, + min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct, + min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct +from ( + select + y, + unnest(regexp_split_to_array(a1.t, ','))::int AS t, + unnest(regexp_split_to_array(a1.b::text, ',')) AS b, + unnest(a1.a) AS a, + unnest(a1.aa) AS aa + from ( + select + y, + string_agg(x::text, ',') AS t, + string_agg(x::text::bytea, ',') AS b, + array_agg(x) AS a, + array_agg(ARRAY[x]) AS aa + from pagg_test + group by y + ) a1 +) a2 +group by y; + +-- Ensure results are correct. +select * from v_pagg_test order by y; + +-- Ensure parallel aggregation is actually being used. +explain (costs off) select * from v_pagg_test order by y; + +set max_parallel_workers_per_gather = 0; + +-- Ensure results are the same without parallel aggregation. +select * from v_pagg_test order by y; + +-- Clean up +reset max_parallel_workers_per_gather; +reset bytea_output; +reset min_parallel_table_scan_size; +reset parallel_leader_participation; +reset parallel_tuple_cost; +reset parallel_setup_cost; + +drop view v_pagg_test; +drop table pagg_test; + -- FILTER tests select min(unique1) filter (where unique1 > 100) from tenk1;