My Average Aggregate Example

This example, MyAverageAggregationFunction is available in the sample user-defined functions package at <MashZone NextGen installation>/raql-udfs/SampleRaqlLib. It implements an aggregate function myAverageFunction(Number column) similar to the MashZone NextGen built-in average function:

The aggregate method performs intermediate calculations. This increments the number of rows processed and adds the current column value to a subtotal. Both of these intermediate values are added to the state object. The getAggregate method then uses state to calculate the group/partition/window average.

package com.raql.samples;

import java.io.Serializable;

import com.jackbe.jbp.raql.udx.loader.RaqlFunc;

import de.rtm.push.adapters.UserDefinedAggregationFunctionAdapter;

/**

* This aggregate adapter computes the average of a group, partition

* or window.

*/

@RaqlFunc(name="myAverageFunction")

public class MyAverageAdapter implements UserDefinedAggregationFunctionAdapter

<Number[]> {

/**

* Returns the types of the input parameters.

*/

@Override

public Type[] getParameterTypes() {

return new Type[] {Type.DOUBLE};

}

/**

* Returns the type of the result.

*/

@Override

public Type getAggregateType() {

return Type.DOUBLE;

}

/**

* Creates the initial state of the average.

*/

@Override

public Number[] createInitialState() {

// the first value is the count and the second the cumulative sum

return new Number[] {0l, 0d};

}

/**

* Aggregates the current internal state with the new input values

* and derives a new internal state.

* @param state The current internal state

* @param values The new input values

* @return The new internal state

*/

@Override

public Number[] aggregate(Number[] state, Serializable[] values) {

Long n = (Long) state[0];

Double sum = (Double) state[1];

// null-aware handling of input values

if (values[0] != null) {

sum += (Double) values[0];

n += 1;

}

// always return a new state

return new Number[] {n, sum};

}

/**

* Derives the final aggregate value from the current internal state.

* @param state the internal state

* @return the aggregate value

*/

@Override

public Object getAggregate(Number[] state) {

long n = (Long) state[0];

double sum = (Double) state[1];

// the average of an empty data set is null

if (n == 0l)

return null;

return sum/n;

}

/**

* Removes the input values from the internal state.

* This step is required for the support of the

* positive/negative approach.

* @param state the current internal state

* @param values the input values

* @return the new internal state

*/

@Override

public Number[] negativeCall(Number[] state, Serializable[] values) {

if (values[0] == null)

return state;

long n = (Long) state[0];

double sum = (Double) state[1];

n--;

sum -= (Double) values[0];

// always return a new state

return new Number[] {n, sum};

}

/**

* Indicates support of positive/negative approach, which can add and

* remove values from the internal state; aggregate analytic functions

* supporting that approach allow for a more efficient evaluation

* when windows are used.

* @return indicates whether PN approach is supported

*/

@Override

public boolean supportsPN() {

return true;

}

/**

* Indicates whether the internal state is empty.

* @param state the internal state

* @return indicates whether the internal state is empty

*/

@Override

public boolean isEmptyAggregate(Number[] state) {

if ((Long) state[0] == 0)

return true;

return false;

}

}

This example illustrates that the state object can track multiple properties. In this case state tracks both the count of the number of rows processed so far and the subtotal of the values for the column being averaged.