Presto Analytics : RAQL Queries : Create and Add User-Defined Functions for RAQL Queries : Write Aggregate Analytic Functions for RAQL
Write Aggregate Analytic Functions for RAQL
Aggregate analytic functions are a single class that implement the UserDefinedAggregateFunctionAdapters interface in the de.rtm.push.adapters package of the Presto RAQL User Defined Function API.
Like window analytic functions, aggregate analytic functions have access to all rows, or records in the current group, partition or window. They perform an aggregate calculation that uses all records to return a single result. Depending on where they are used in RAQL queries, this single result may be all the query returns or it may be returned as a column on every row.
To accomplish this, aggregate analytic functions use the following methods:
*createInitialState(): is optional. This resets the aggregate state for the current window, partition or group to whatever value or state the caluclation should begin from.
*aggregate(S state, Serializable[] values): performs an intermediate calculation using the current state and the value of the next record to obtain a new intermediate state.
*getAggregate(S state): implements the final computation for the group, partition or window, if any, and returns the final aggregate result once all intermediate calculations are complete.
There are also several 'housekeeping' methods you must implement: getAggregateType(), getParameterTypes(), isEmptyAggregate(S state), negativeCall(S state, Serializable[] values) and supportsPN().
You set up your development environment for aggregate analytic functions just the same as for plain functions. See Set Up Your Development Environment for details. Write the class for your aggregate analytic function and then Configure, Compile, Deploy and Test User-Defined Functions.
We’re going to use two examples. The first example, My Average Aggregate Example is an implementation of the built-in avg(Number column) method. This example shows the basics of an aggregate analytics function, how to track state for intermediate steps and then perform the final calculation.
The second example, Kurtosis Using a Third Party Library, uses methods in the Apache Commons Math library to calculate the kurtosis for a column. This is an example of how to use third-party libraries.
My Average Aggregate Example
This example, MyAverageAggregationFunction is available in the sample user-defined functions package at presto-install/raql-udfs/SampleRaqlLib. It implements an aggregate function myAverageFunction(Number column) similar to the Presto built-in average function:
The aggregate method performs intermediate calculations. This increments the number of rows processed and adds the current column value to a subtotal. Both of these intermediate values are added to the state object. The getAggregate method then uses state to calculate the group/partition/window average.
package com.raql.samples;

import java.io.Serializable;
import java.util.Arrays;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.DefaultSignature;
import de.rtm.push.adapters.UserDefinedAggregationFunctionAdapter;
import de.rtm.util.exception.UserDefinedAggregationFunctionException;
import de.rtm.util.records.metadata.FieldMetaData;
import de.rtm.util.records.metadata.FieldMetaDatas;
import de.rtm.util.records.types.JavaTypes;
import de.rtm.util.records.types.JavaTypes.Type;
/**
* This aggregate adapter computes the average of a group, partition
* or window.
*/
@RaqlFunc(name="myAverageFunction")
public static class MyAverageAdapter implements UserDefinedAggregationFunctionAdapter<Number[]> {
/**
* Returns the types of the input parameters.
*/
@Override
public Type[] getParameterTypes() {
return new Type[] {Type.DOUBLE};
}
/**
* Returns the type of the result.
*/
@Override
public Type getAggregateType() {
return Type.DOUBLE;
}
/**
* Creates the initial state of the average.
*/
@Override
public Number[] createInitialState() {
// the first value is the count and the second the cumulative sum
return new Number[] {0l, 0d};
}
/**
* Aggregates the current internal state with the new input values
* and derives a new internal state.
* @param state The current internal state
* @param values The new input values
* @return The new internal state
*/
@Override
public Number[] aggregate(Number[] state, Serializable[] values) {
Long n = (Long) state[0];
Double sum = (Double) state[1];
// null-aware handling of input values
if (values[0] != null) {
sum += (Double) values[0];
n += 1;
}
// always return a new state
return new Number[] {n, sum};
}
/**
* Derives the final aggregate value from the current internal state.
* @param state the internal state
* @return the aggregate value
*/
@Override
public Object getAggregate(Number[] state) {
long n = (Long) state[0];
double sum = (Double) state[1];
// the average of an empty data set is null
if (n == 0l)
return null;

return sum/n;
}
/**
* Removes the input values from the internal state.
* This step is required for the support of the
* positive/negative approach.
* @param state the current internal state
* @param values the input values
* @return the new internal state
*/
@Override
public Number[] negativeCall(Number[] state, Serializable[] values) {
if (values[0] == null)
return state;

long n = (Long) state[0];
double sum = (Double) state[1];

n--;
sum -= (Double) values[0];

// always return a new state
return new Number[] {n, sum};
}

/**
* Indicates support of positive/negative approach, which can add and
* remove values from the internal state; aggregate analytic functions
* supporting that approach allow for a more efficient evaluation
* when windows are used.
* @return indicates whether PN approach is supported
*/
@Override
public boolean supportsPN() {
return true;
}
/**
* Indicates whether the internal state is empty.
* @param state the internal state
* @return indicates whether the internal state is empty
*/
@Override
public boolean isEmptyAggregate(Number[] state) {
if ((Long) state[0] == 0)
return true;
return false;
}

}
This example illustrates that the state object can track multiple properties. In this case state tracks both the count of the number of rows processed so far and the subtotal of the values for the column being averaged.
Kurtosis Using a Third Party Library
Kurtosis is a statistical measure of 'peakedness' in the values for a dataset compared to a normal distribution. This indicates how closely the distribution matches the rounded bell shape of a normal distribution.
In this example, we will use an implementation of kurtosis provided in the Apache Commons Math library, version 2.2. The method to calculate kurtosis, in the DescriptiveStatistics class in the Apache Library, expects the values to use as the probability distribution to be primitive values in an array.
To support this, the aggregate method builds an array from the column values for records in a group, partition or window. The getAggregate method then uses this array to perform the calculation. As always, the state object is used to hold state for both methods.
package com.raql.samples;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import com.jackbe.jbp.common.MessageSourceAccessor;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.DefaultSignature;
import de.rtm.push.adapters.UserDefinedAggregationFunctionAdapter;
import de.rtm.util.exception.UserDefinedAggregationFunctionException;
import de.rtm.util.records.metadata.FieldMetaData;
import de.rtm.util.records.metadata.FieldMetaDatas;
import de.rtm.util.records.types.JavaTypes;
import de.rtm.util.records.types.JavaTypes.Type;

@RaqlFunc(name="myKurtosisFunction")
/**
* This adapter computes the kurtosis for a group, partition or window.
*/
public static class KurtosisAdapter implements UserDefinedAggregationFunctionAdapter<double[]> {
/**
* Returns the types of the input parameters.
*/
@Override
public Type[] getParameterTypes() {
return new Type[] {Type.DOUBLE};
}
/**
* Returns the type of the result.
*/
@Override
public Type getAggregateType() {
return Type.DOUBLE;
}
/**
* Creates the initial state of the kurtosis. Note that
* the state solely stores the input values. Therefore
* the initial state is an empty array.
* @return the initial state of the kurtosis
*/
@Override
public double[] createInitialState() {
return new double[] {};
}
/**
* Aggregates the current internal state with the new input values
* and derives a new internal state. As the kurtosis aggregate is
* computed by an external library, the state solely stores
* the incoming values.
* @param state the current internal state
* @param values the new input values
* @return the new internal state
*/
@Override
public double[] aggregate(double[] state, Serializable[] values) {
if (values[0] != null) {
double[] newState = new double[state.length+1];
System.arraycopy(state, 0, newState, 0, state.length);
newState[newState.length-1] = (Double) values[0];
return newState;
}
else
return state;
}
/**
* Derives the return value from the current internal state.
* The kurtosis aggregate is computed by calling an external
* library with all input values.
* @param state the internal state
* @return the kurtosis value
*/
@Override
public Object getAggregate(double[] state) {
DescriptiveStatistics ds = new DescriptiveStatistics(state);
double kurtosis = ds.getKurtosis();
if (Double.isNaN(kurtosis))
return 0.0;
else
return kurtosis;
}

/**
* Removes the input values from the internal state.
* This step is required for the support of the
* positive/negative approach.
* @param state the current internal state
* @param values the input values
* @return the new internal state
*/
@Override
public double[] negativeCall(double[] state, Serializable[] values) {
if (values[0] == null)
return state;
else {
boolean gotValue = false;
double[] newState = new double[state.length-1];
double value = (Double)values[0];
for (int i = 0; i < state.length; i++) {
gotValue |= state[i] == value;
newState[i] = state[i + (gotValue ? 1 :0)];
}
return newState;
}
}

/**
* Indicates support for positive/negative approach, which allows
* adding or removing values from the internal state;
* This approach provides a more efficient evaluation
* when windows are used.
* @return indicates whether PN approach is supported
*/
@Override
public boolean supportsPN() {
return true;
}
/**
* Indicates whether the internal state is empty.
* @param state the internal state
* @return indicates whether the internal state is empty
*/
@Override
public boolean isEmptyAggregate(double[] state) {
return state.length == 0;
}
}
To compile this example, you must include the Apache Commons Math library, version 2.2, in the classpath. You may include the jar file for this library in the lib folder for the user-defined function library. This specific library is also used in Presto, so you also simply add the jar file for this library to the classpath. See Statistics and Analytics Third-Party Libraries for information.
Copyright © 2013-2015 Software AG, Darmstadt, Germany.

Product LogoContact Support   |   Community   |   Feedback