S
- The type of the aggregation functions state object.public interface UserDefinedAggregationFunctionAdapter<S extends Serializable>
Implementation of a user-defined aggregation function involves the following steps:
COUNT
aggregate a simple counter is maintained as a state object), defining
the logic how the state is updated for each new record and defining the logic how to finally
build the aggregate value from the intermediate state object. There are some further methods
to be implemented that mainly deal with optimizations and which will be covered later. Important: Aggregations functions defined by this interface may be used in the query engines pull algebra as well as in its push algebra. The later one involves an additional temporal dimension next to the group dimensions and adds an important requirement to the implementation of below mentioned methods: Any method operating on a state object may never modify this object itself but instead needs to return a new state object with the updated information.
Type.INTEGER
to Type.LONG
or from
Type.FLOAT
to Type.DOUBLE
may be implicitly applied by the parser and
translator when looking up a matching aggregation function adapter. Hence an aggregation function with the signature
(Type.DOUBLE, Type.DOUBLE) -> Type.DOUBLE
also matches to a call with two Type.INTEGER
arguments or a call with one Type.INTEGER
and one Type.DOUBLE
argument. However,
if for instance any of the arguments is of type Type.BIGDECIMAL
or Type.STRING
,
the signature will not match and the adapter does not qualify for the aggregation function call.
If the aggregation function shall be overloaded with multiple distinct signatures a
UserDefinedAggregationFunctionAdapterFactory
needs to be implemented which itself instantiates
the correct adapter for a set of input parameters if possible.
createInitialState()
serves as a factory to create the very first state object for a group. An aggregation function
adapter itself may not maintain any internal state other than that maintained in its state-object.
While the state-object is exclusively created for each group the aggregation function itself may
be shared among several queries.
From each new record, the query engine retrieves this aggregation functions argument columns
and passes them into the method
aggregate(Serializable, Serializable[])
.
Given those input column values and the previous state, this method computes a new state
object which reflects the newly seen values from the latest record. If all records of a group have
been processed this way, a call to
getAggregate(Serializable)
- with the final state as input - determines the final aggregate result.
As the window slides, new records qualify for the group and need to be considered by the aggregation and old records become invalid and their removal also needs to be reflected. With the methods described so far it is of course possible to naively re-compute the aggregate for every window, but for performance reasons the query engine also allows for an incremental computation of aggregates.
As an example, let us assume the computation of a SUM of a particular column over the last 1000 records. As the window slides by one record the aggregation function could re-iterate over the last 999 records plus the record and sum-up their column values. Alternatively it could keep track of the total sum internally and - as the window slides - subtract the value of the eldest invalidated record and add the value of the newly added record to that sum. It's obvious that the first alternative requires time linear in the size of the window, whereas the second alternative only requires constant time.
The UserDefinedAggregationFunctionAdapter
interface defines three methods aiming at the
previously mentioned optimizations: The method supportsPN()
signals the query engine, that this adapter is capable of incrementally computing the aggregate value.
(PN stands for Positive/Negative and refers to records being added (positive) or removed (negative) from
a sliding window).
If the adapter is incrementally computing its aggregate, method aggregate(Serializable, Serializable[])
needs to only update the state object for those records which newly qualify for the window, whereas
negativeCall(Serializable, Serializable[])
needs to "subtract" the invalidated records from the state
object.
As for the PN approach it is sometimes undecidable from the query engines perspective if an aggregation
state is newly created or if it is "empty" due to a series of "subtractions", isEmptyAggregate(Serializable)
needs to be implemented reasonably for an PN enabled adapter.
getAggregateType()
, getParameterTypes()
and supportsPN()
may not throw any exceptions at all, otherwise parsing and translating the query string
into a runnable query will already fail. Exceptions thrown in the other methods
are caught and logged during query runtime but do not affect the overall processing of
the query. Instead, if one method call failed with an exception, its result will be null
.
Therefore, the result of a call to a user-defined aggregation function is always considered nullable.
supportsPN
returns true. At the same time, isEmptyAggregate
is implemented.
public static class AverageAdapter implements UserDefinedAggregationFunctionAdapter<Number[]>
{@Override
public Type[] getParameterTypes() { return new Type[] { Type.DOUBLE }; }@Override
public Type getAggregateType() { return Type.DOUBLE; }@Override
public Number[] createInitialState() { // the first value is the count and the second the cumulative sum return new Number[] {0l, 0d}; }@Override
public Number[] aggregate(Number[] state, Serializable[] values) { Long n = (Long) state[0]; Double sum = (Double) state[1]; // null-aware handling of input values if (values[0] != null) { sum += (Double) values[0]; n += 1; } // always return a new state return new Number[] {n, sum}; }@Override
public Object getAggregate(Number[] state) { long n = (Long) state[0]; double sum = (Double) state[1]; // the average of an empty data set is null if (n == 0l) return null; return sum/n; }@Override
public Number[] negativeCall(Number[] state, Serializable[] values) { if (values[0] == null) return state; long n = (Long) state[0]; double sum = (Double) state[1]; n--; sum -= (Double) values[0]; // always return a new state return new Number[] {n, sum}; }@Override
public boolean supportsPN() { return true; }@Override
public boolean isEmptyAggregate(Number[] state) { if ((Long) state[0] == 0) return true; return false; } }
Modifier and Type | Method and Description |
---|---|
S |
aggregate(S state,
Serializable[] values)
Combines the intermediate state of the aggregation process
and the next value into a new intermediate state.
|
S |
createInitialState()
Creates an empty initial state used for the aggregation.
|
Object |
getAggregate(S state)
Retrieves the aggregate from the aggregation state after all values
have been aggregated.
|
JavaTypes.Type |
getAggregateType()
Returns the type of the aggregate produced by this user defined aggregation function.
|
JavaTypes.Type[] |
getParameterTypes()
Returns the type of the values which are aggregated by this user defined aggregation function.
|
boolean |
isEmptyAggregate(S state)
Returns whether the given state is an empty aggregate.
|
S |
negativeCall(S state,
Serializable[] values)
Combines the intermediate state of the aggregation process
and removes the given value into a new intermediate state.
|
boolean |
supportsPN()
Returns if this user defined function implements both the
negativeCall method and the isEmptyAggregate method. |
JavaTypes.Type[] getParameterTypes()
JavaTypes.Type getAggregateType()
S createInitialState()
aggregate
and
getAggregate
.S aggregate(S state, Serializable[] values)
getParameterType
. The
state and the result have to be of the same type, which has
to be the type of the result of createInitialState
.
Important: The given state must not be changed!state
- the state prior to the aggregation stepvalues
- the values to be aggregatedObject getAggregate(S state)
aggregate
. The returned aggregate has to be
of the type returned by getAggregateType
.state
- the state after all aggregation stepsS negativeCall(S state, Serializable[] values)
getParameterType
.
The state and the result have to be of the same type, which has
to be the type of the result of createInitialState
.
If PN approach is used, this method has to be implemented.
Otherwise, this method may throw a UnsupportedOperationException if
supportsPN
returns false
.
Important: The given state must not be changed!state
- the state prior to the aggregation stepvalues
- the values to be removed out of the state.boolean supportsPN()
negativeCall
method and the isEmptyAggregate
method.
If those methods are not implemented, this adapter can not be used
in PN approach.boolean isEmptyAggregate(S state)
supportsPN
returns false
.state
- the intermediate statetrue
, if the state is an empty aggregate, otherwise false