Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5 - Mailing list pgsql-bugs
From | Tom Lane |
---|---|
Subject | Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5 |
Date | |
Msg-id | 20303.1508014292@sss.pgh.pa.us Whole thread Raw |
In response to | Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5 (Tom Lane <tgl@sss.pgh.pa.us>) |
List | pgsql-bugs |
I wrote: > To know what value of randomAccess to pass to the tuplesort setup, > we have to know *at the first transition-function call* whether > there may be multiple final-function calls coming up. So what > what I'm imagining is a simple boolean result "yes, there will be > only one finalfn call, so it can destructively modify the transition > state", or "there might be more than one finalfn call, so the finalfn(s) > must preserve transition state". And this info has to be available > throughout the aggregate run. Attached is a proposed patch to make the ordered-set aggregates safe for state merging. I've not tested it really thoroughly, but it passes the regression cases added in 52328727b. regards, tom lane diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 40d8ec9..1718285 100644 *** a/src/backend/executor/nodeAgg.c --- b/src/backend/executor/nodeAgg.c *************** typedef struct AggStatePerTransData *** 255,260 **** --- 255,265 ---- Aggref *aggref; /* + * Is this state value actually being shared by more than one Aggref? + */ + bool aggshared; + + /* * Nominal number of arguments for aggregate function. For plain aggs, * this excludes any ORDER BY expressions. For ordered-set aggs, this * counts both the direct and aggregated (ORDER BY) arguments. *************** ExecInitAgg(Agg *node, EState *estate, i *** 3345,3353 **** { /* * Existing compatible trans found, so just point the 'peragg' to ! * the same per-trans struct. */ pertrans = &pertransstates[existing_transno]; peragg->transno = existing_transno; } else --- 3350,3359 ---- { /* * Existing compatible trans found, so just point the 'peragg' to ! * the same per-trans struct, and mark the trans state as shared. */ pertrans = &pertransstates[existing_transno]; + pertrans->aggshared = true; peragg->transno = existing_transno; } else *************** build_pertrans_for_aggref(AggStatePerTra *** 3449,3454 **** --- 3455,3461 ---- /* Begin filling in the pertrans data */ pertrans->aggref = aggref; + pertrans->aggshared = false; pertrans->aggCollation = aggref->inputcollid; pertrans->transfn_oid = aggtransfn; pertrans->serialfn_oid = aggserialfn; *************** AggGetAggref(FunctionCallInfo fcinfo) *** 4105,4121 **** { if (fcinfo->context && IsA(fcinfo->context, AggState)) { AggStatePerAgg curperagg; AggStatePerTrans curpertrans; /* check curperagg (valid when in a final function) */ ! curperagg = ((AggState *) fcinfo->context)->curperagg; if (curperagg) return curperagg->aggref; /* check curpertrans (valid when in a transition function) */ ! curpertrans = ((AggState *) fcinfo->context)->curpertrans; if (curpertrans) return curpertrans->aggref; --- 4112,4129 ---- { if (fcinfo->context && IsA(fcinfo->context, AggState)) { + AggState *aggstate = (AggState *) fcinfo->context; AggStatePerAgg curperagg; AggStatePerTrans curpertrans; /* check curperagg (valid when in a final function) */ ! curperagg = aggstate->curperagg; if (curperagg) return curperagg->aggref; /* check curpertrans (valid when in a transition function) */ ! curpertrans = aggstate->curpertrans; if (curpertrans) return curpertrans->aggref; *************** AggGetTempMemoryContext(FunctionCallInfo *** 4146,4151 **** --- 4154,4197 ---- } /* + * AggStateIsShared - find out whether transition state is shared + * + * If the function is being called as an aggregate support function, + * return TRUE if the aggregate's transition state is shared across + * multiple aggregates, FALSE if it is not. + * + * Returns TRUE if not called as an aggregate support function. + * This is intended as a conservative answer, ie "no you'd better not + * scribble on your input". In particular, will return TRUE if the + * aggregate is being used as a window function, which is a scenario + * in which changing the transition state is a bad idea. We might + * want to refine the behavior for the window case in future. + */ + bool + AggStateIsShared(FunctionCallInfo fcinfo) + { + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + AggState *aggstate = (AggState *) fcinfo->context; + AggStatePerAgg curperagg; + AggStatePerTrans curpertrans; + + /* check curperagg (valid when in a final function) */ + curperagg = aggstate->curperagg; + + if (curperagg) + return aggstate->pertrans[curperagg->transno].aggshared; + + /* check curpertrans (valid when in a transition function) */ + curpertrans = aggstate->curpertrans; + + if (curpertrans) + return curpertrans->aggshared; + } + return true; + } + + /* * AggRegisterCallback - register a cleanup callback for an aggregate * * This is useful for aggs to register shutdown callbacks, which will ensure diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c index 25905a3..1e323d9 100644 *** a/src/backend/utils/adt/orderedsetaggs.c --- b/src/backend/utils/adt/orderedsetaggs.c *************** *** 40,53 **** * create just once per query because they will not change across groups. * The per-query struct and subsidiary data live in the executor's per-query * memory context, and go away implicitly at ExecutorEnd(). */ typedef struct OSAPerQueryState { ! /* Aggref for this aggregate: */ Aggref *aggref; /* Memory context containing this struct and other per-query data: */ MemoryContext qcontext; /* These fields are used only when accumulating tuples: */ --- 40,61 ---- * create just once per query because they will not change across groups. * The per-query struct and subsidiary data live in the executor's per-query * memory context, and go away implicitly at ExecutorEnd(). + * + * These structs are set up during the first call of the transition function. + * Because we allow nodeAgg.c to merge ordered-set aggregates (but not + * hypothetical aggregates) with identical inputs and transition functions, + * this info must not depend on the particular aggregate (ie, particular + * final-function), nor on the direct argument(s) of the aggregate. */ typedef struct OSAPerQueryState { ! /* Representative Aggref for this aggregate: */ Aggref *aggref; /* Memory context containing this struct and other per-query data: */ MemoryContext qcontext; + /* Do we expect multiple final-function calls within one group? */ + bool rescan_needed; /* These fields are used only when accumulating tuples: */ *************** typedef struct OSAPerGroupState *** 91,96 **** --- 99,106 ---- Tuplesortstate *sortstate; /* Number of normal rows inserted into sortstate: */ int64 number_of_rows; + /* Have we already done tuplesort_performsort? */ + bool sort_done; } OSAPerGroupState; static void ordered_set_shutdown(Datum arg); *************** ordered_set_startup(FunctionCallInfo fci *** 146,151 **** --- 156,164 ---- qstate->aggref = aggref; qstate->qcontext = qcontext; + /* We need to support rescans if the trans state is shared */ + qstate->rescan_needed = AggStateIsShared(fcinfo); + /* Extract the sort information */ sortlist = aggref->aggorder; numSortCols = list_length(sortlist); *************** ordered_set_startup(FunctionCallInfo fci *** 277,291 **** qstate->sortOperators, qstate->sortCollations, qstate->sortNullsFirsts, ! work_mem, false); else osastate->sortstate = tuplesort_begin_datum(qstate->sortColType, qstate->sortOperator, qstate->sortCollation, qstate->sortNullsFirst, ! work_mem, false); osastate->number_of_rows = 0; /* Now register a shutdown callback to clean things up at end of group */ AggRegisterCallback(fcinfo, --- 290,307 ---- qstate->sortOperators, qstate->sortCollations, qstate->sortNullsFirsts, ! work_mem, ! qstate->rescan_needed); else osastate->sortstate = tuplesort_begin_datum(qstate->sortColType, qstate->sortOperator, qstate->sortCollation, qstate->sortNullsFirst, ! work_mem, ! qstate->rescan_needed); osastate->number_of_rows = 0; + osastate->sort_done = false; /* Now register a shutdown callback to clean things up at end of group */ AggRegisterCallback(fcinfo, *************** ordered_set_startup(FunctionCallInfo fci *** 306,319 **** * group) by ExecutorEnd. But we must take care to release any potential * non-memory resources. * ! * This callback is arguably unnecessary, since we don't support use of ! * ordered-set aggs in AGG_HASHED mode and there is currently no non-error ! * code path in non-hashed modes wherein nodeAgg.c won't call the finalfn ! * after calling the transfn one or more times. So in principle we could rely ! * on the finalfn to delete the tuplestore etc. However, it's possible that ! * such a code path might exist in future, and in any case it'd be ! * notationally tedious and sometimes require extra data copying to ensure ! * we always delete the tuplestore in the finalfn. */ static void ordered_set_shutdown(Datum arg) --- 322,333 ---- * group) by ExecutorEnd. But we must take care to release any potential * non-memory resources. * ! * In the case where we're not expecting multiple finalfn calls, we could ! * arguably rely on the finalfn to clean up; but it's easier and more testable ! * if we just do it the same way in either case. Note that many of the ! * finalfns could *not* free the tuplesort object, at least not without extra ! * data copying, because what they return is a pointer to a datum inside the ! * tuplesort object. */ static void ordered_set_shutdown(Datum arg) *************** percentile_disc_final(PG_FUNCTION_ARGS) *** 436,443 **** if (osastate->number_of_rows == 0) PG_RETURN_NULL(); ! /* Finish the sort */ ! tuplesort_performsort(osastate->sortstate); /*---------- * We need the smallest K such that (K/N) >= percentile. --- 450,463 ---- if (osastate->number_of_rows == 0) PG_RETURN_NULL(); ! /* Finish the sort, or rescan if we already did */ ! if (!osastate->sort_done) ! { ! tuplesort_performsort(osastate->sortstate); ! osastate->sort_done = true; ! } ! else ! tuplesort_rescan(osastate->sortstate); /*---------- * We need the smallest K such that (K/N) >= percentile. *************** percentile_disc_final(PG_FUNCTION_ARGS) *** 457,469 **** if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL)) elog(ERROR, "missing row in percentile_disc"); - /* - * Note: we *cannot* clean up the tuplesort object here, because the value - * to be returned is allocated inside its sortcontext. We could use - * datumCopy to copy it out of there, but it doesn't seem worth the - * trouble, since the cleanup callback will clear the tuplesort later. - */ - /* We shouldn't have stored any nulls, but do the right thing anyway */ if (isnull) PG_RETURN_NULL(); --- 477,482 ---- *************** percentile_cont_final_common(FunctionCal *** 543,550 **** Assert(expect_type == osastate->qstate->sortColType); ! /* Finish the sort */ ! tuplesort_performsort(osastate->sortstate); first_row = floor(percentile * (osastate->number_of_rows - 1)); second_row = ceil(percentile * (osastate->number_of_rows - 1)); --- 556,569 ---- Assert(expect_type == osastate->qstate->sortColType); ! /* Finish the sort, or rescan if we already did */ ! if (!osastate->sort_done) ! { ! tuplesort_performsort(osastate->sortstate); ! osastate->sort_done = true; ! } ! else ! tuplesort_rescan(osastate->sortstate); first_row = floor(percentile * (osastate->number_of_rows - 1)); second_row = ceil(percentile * (osastate->number_of_rows - 1)); *************** percentile_cont_final_common(FunctionCal *** 575,587 **** val = lerpfunc(first_val, second_val, proportion); } - /* - * Note: we *cannot* clean up the tuplesort object here, because the value - * to be returned may be allocated inside its sortcontext. We could use - * datumCopy to copy it out of there, but it doesn't seem worth the - * trouble, since the cleanup callback will clear the tuplesort later. - */ - PG_RETURN_DATUM(val); } --- 594,599 ---- *************** percentile_disc_multi_final(PG_FUNCTION_ *** 779,786 **** */ if (i < num_percentiles) { ! /* Finish the sort */ ! tuplesort_performsort(osastate->sortstate); for (; i < num_percentiles; i++) { --- 791,804 ---- */ if (i < num_percentiles) { ! /* Finish the sort, or rescan if we already did */ ! if (!osastate->sort_done) ! { ! tuplesort_performsort(osastate->sortstate); ! osastate->sort_done = true; ! } ! else ! tuplesort_rescan(osastate->sortstate); for (; i < num_percentiles; i++) { *************** percentile_disc_multi_final(PG_FUNCTION_ *** 804,814 **** } } - /* - * We could clean up the tuplesort object after forming the array, but - * probably not worth the trouble. - */ - /* We make the output array the same shape as the input */ PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull, ARR_NDIM(param), --- 822,827 ---- *************** percentile_cont_multi_final_common(Funct *** 902,909 **** */ if (i < num_percentiles) { ! /* Finish the sort */ ! tuplesort_performsort(osastate->sortstate); for (; i < num_percentiles; i++) { --- 915,928 ---- */ if (i < num_percentiles) { ! /* Finish the sort, or rescan if we already did */ ! if (!osastate->sort_done) ! { ! tuplesort_performsort(osastate->sortstate); ! osastate->sort_done = true; ! } ! else ! tuplesort_rescan(osastate->sortstate); for (; i < num_percentiles; i++) { *************** percentile_cont_multi_final_common(Funct *** 962,972 **** } } - /* - * We could clean up the tuplesort object after forming the array, but - * probably not worth the trouble. - */ - /* We make the output array the same shape as the input */ PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull, ARR_NDIM(param), --- 981,986 ---- *************** mode_final(PG_FUNCTION_ARGS) *** 1043,1050 **** shouldfree = !(osastate->qstate->typByVal); ! /* Finish the sort */ ! tuplesort_performsort(osastate->sortstate); /* Scan tuples and count frequencies */ while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val)) --- 1057,1070 ---- shouldfree = !(osastate->qstate->typByVal); ! /* Finish the sort, or rescan if we already did */ ! if (!osastate->sort_done) ! { ! tuplesort_performsort(osastate->sortstate); ! osastate->sort_done = true; ! } ! else ! tuplesort_rescan(osastate->sortstate); /* Scan tuples and count frequencies */ while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val)) *************** mode_final(PG_FUNCTION_ARGS) *** 1097,1109 **** if (shouldfree && !last_val_is_mode) pfree(DatumGetPointer(last_val)); - /* - * Note: we *cannot* clean up the tuplesort object here, because the value - * to be returned is allocated inside its sortcontext. We could use - * datumCopy to copy it out of there, but it doesn't seem worth the - * trouble, since the cleanup callback will clear the tuplesort later. - */ - if (mode_freq) PG_RETURN_DATUM(mode_val); else --- 1117,1122 ---- *************** hypothetical_rank_common(FunctionCallInf *** 1174,1179 **** --- 1187,1195 ---- hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc); + /* because we need a hypothetical row, we can't share transition state */ + Assert(!osastate->sort_done); + /* insert the hypothetical row into the sort */ slot = osastate->qstate->tupslot; ExecClearTuple(slot); *************** hypothetical_rank_common(FunctionCallInf *** 1190,1195 **** --- 1206,1212 ---- /* finish the sort */ tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; /* iterate till we find the hypothetical row */ while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot, NULL)) *************** hypothetical_rank_common(FunctionCallInf *** 1207,1216 **** ExecClearTuple(slot); - /* Might as well clean up the tuplesort object immediately */ - tuplesort_end(osastate->sortstate); - osastate->sortstate = NULL; - return rank; } --- 1224,1229 ---- *************** hypothetical_dense_rank_final(PG_FUNCTIO *** 1329,1334 **** --- 1342,1350 ---- /* Get short-term context we can use for execTuplesMatch */ tmpcontext = AggGetTempMemoryContext(fcinfo); + /* because we need a hypothetical row, we can't share transition state */ + Assert(!osastate->sort_done); + /* insert the hypothetical row into the sort */ slot = osastate->qstate->tupslot; ExecClearTuple(slot); *************** hypothetical_dense_rank_final(PG_FUNCTIO *** 1345,1350 **** --- 1361,1367 ---- /* finish the sort */ tuplesort_performsort(osastate->sortstate); + osastate->sort_done = true; /* * We alternate fetching into tupslot and extraslot so that we have the *************** hypothetical_dense_rank_final(PG_FUNCTIO *** 1391,1400 **** ExecDropSingleTupleTableSlot(extraslot); - /* Might as well clean up the tuplesort object immediately */ - tuplesort_end(osastate->sortstate); - osastate->sortstate = NULL; - rank = rank - duplicate_count; PG_RETURN_INT64(rank); --- 1408,1413 ---- diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 5769f64..13f1bce 100644 *** a/src/include/catalog/pg_aggregate.h --- b/src/include/catalog/pg_aggregate.h *************** DATA(insert ( 3267 n 0 jsonb_agg_transfn *** 318,330 **** DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* ordered-set and hypothetical-set aggregates */ ! DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); --- 318,330 ---- DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* ordered-set and hypothetical-set aggregates */ ! DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ )); diff --git a/src/include/fmgr.h b/src/include/fmgr.h index b604a5c..a68ec91 100644 *** a/src/include/fmgr.h --- b/src/include/fmgr.h *************** extern int AggCheckCallContext(FunctionC *** 698,703 **** --- 698,704 ---- MemoryContext *aggcontext); extern fmAggrefPtr AggGetAggref(FunctionCallInfo fcinfo); extern MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo); + extern bool AggStateIsShared(FunctionCallInfo fcinfo); extern void AggRegisterCallback(FunctionCallInfo fcinfo, fmExprContextCallbackFunction func, Datum arg); -- Sent via pgsql-bugs mailing list (pgsql-bugs@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-bugs
pgsql-bugs by date: