From be85f1810ccba0988c68a875c5debff630eabf02 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Thu, 21 Sep 2023 16:52:30 +0530 Subject: [PATCH v27 6/8] Implement serialization functions for interval aggregate state Now that interval aggregates use transition state which of date type Internal, parallel aggregates require these separate functions. Jian He and Ashutosh Bapat --- src/backend/utils/adt/timestamp.c | 84 +++++++++++++++++++++++++++- src/include/catalog/pg_aggregate.dat | 4 ++ src/include/catalog/pg_proc.dat | 6 ++ 3 files changed, 92 insertions(+), 2 deletions(-) diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 4b35ee0bdf..0b385fd72c 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -3821,8 +3821,12 @@ in_range_interval_interval(PG_FUNCTION_ARGS) /* - * Prepare state data for an interval aggregate function that needs to compute - * sum and count. + * Prepare state data for an interval aggregate function, that needs to compute + * sum and count, in the aggregate's memory context. + * + * The function is used when the state data needs to be allocated in aggregate's + * context. When the state data needs to be allocated in the current memory + * context, we use palloc0 directly e.g. interval_avg_deserialize(). */ static IntervalAggState * makeIntervalAggState(FunctionCallInfo fcinfo) @@ -4000,6 +4004,82 @@ interval_avg_combine(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state1); } +/* + * interval_avg_serialize + * Serialize IntervalAggState for interval aggregates. + */ +Datum +interval_avg_serialize(PG_FUNCTION_ARGS) +{ + IntervalAggState *state; + StringInfoData buf; + bytea *result; + + /* Ensure we disallow calling when not in aggregate context */ + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + /* TODO: Handle NULL inputs? */ + state = (IntervalAggState *) PG_GETARG_POINTER(0); + pq_begintypsend(&buf); + /* N */ + pq_sendint64(&buf, state->N); + /* Finite interval value */ + pq_sendint64(&buf, state->sumX.time); + pq_sendint32(&buf, state->sumX.day); + pq_sendint32(&buf, state->sumX.month); + /* pInfcount */ + pq_sendint64(&buf, state->pInfcount); + /* nInfcount */ + pq_sendint64(&buf, state->nInfcount); + result = pq_endtypsend(&buf); + PG_RETURN_BYTEA_P(result); +} + +/* + * interval_avg_deserialize + * Deserialize bytea into IntervalAggState for interval aggregates. + */ +Datum +interval_avg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + IntervalAggState *result; + StringInfoData buf; + + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Initialize a StringInfo so that we can "receive" it using the standard + * recv-function infrastructure. + */ + initReadOnlyStringInfo(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); + + result = (IntervalAggState *) palloc0(sizeof(IntervalAggState)); + + /* N */ + result->N = pq_getmsgint64(&buf); + + /* Interval struct elements, one by one. */ + result->sumX.time = pq_getmsgint64(&buf); + result->sumX.day = pq_getmsgint(&buf, sizeof(result->sumX.day)); + result->sumX.month = pq_getmsgint(&buf, sizeof(result->sumX.month)); + + /* pInfcount */ + result->pInfcount = pq_getmsgint64(&buf); + + /* nInfcount */ + result->nInfcount = pq_getmsgint64(&buf); + + pq_getmsgend(&buf); + + PG_RETURN_POINTER(result); +} + /* * Remove the given interval value from the aggregated state. */ diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat index e2087d7be1..0e62c3f7a6 100644 --- a/src/include/catalog/pg_aggregate.dat +++ b/src/include/catalog/pg_aggregate.dat @@ -45,6 +45,8 @@ aggtranstype => '_float8', agginitval => '{0,0,0}' }, { aggfnoid => 'avg(interval)', aggtransfn => 'interval_avg_accum', aggfinalfn => 'interval_avg', aggcombinefn => 'interval_avg_combine', + aggserialfn => 'interval_avg_serialize', + aggdeserialfn => 'interval_avg_deserialize', aggmtransfn => 'interval_avg_accum', aggminvtransfn => 'interval_accum_inv', aggmfinalfn => 'interval_avg', aggtranstype => 'internal', aggmtranstype => 'internal', aggtransspace => '128', @@ -75,6 +77,8 @@ aggtranstype => 'money', aggmtranstype => 'money' }, { aggfnoid => 'sum(interval)', aggtransfn => 'interval_avg_accum', aggfinalfn => 'interval_sum', aggcombinefn => 'interval_avg_combine', + aggserialfn => 'interval_avg_serialize', + aggdeserialfn => 'interval_avg_deserialize', aggmtransfn => 'interval_avg_accum', aggminvtransfn => 'interval_accum_inv', aggmfinalfn => 'interval_sum', aggtranstype => 'internal', aggmtranstype => 'internal', aggtransspace => '128', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 791359a6e0..7d66de5043 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -4928,6 +4928,12 @@ proname => 'interval_accum_inv', proisstrict => 'f', prorettype => 'internal', proargtypes => 'internal interval', prosrc => 'interval_accum_inv' }, +{ oid => '3813', descr => 'aggregate serial function', + proname => 'interval_avg_serialize', prorettype => 'bytea', + proargtypes => 'internal', prosrc => 'interval_avg_serialize' }, +{ oid => '3814', descr => 'aggregate deserial function', + proname => 'interval_avg_deserialize', prorettype => 'internal', + proargtypes => 'bytea internal', prosrc => 'interval_avg_deserialize' }, { oid => '1844', descr => 'aggregate final function', proname => 'interval_avg', prorettype => 'interval', proargtypes => 'internal', prosrc => 'interval_avg' }, -- 2.35.3