Write Window Analytic Functions for RAQL
With window analytic functions, each function is a single class that implements the UserDefinedWindowFunctionAdapter interface (de.rtm.push.adapters.windowfunctions package) from theMashZone NextGenRAQL User Defined Function API.
Window analytic functions, unlike plain functions, have access to all rows, or records within the current partition or window which they can use to perform calculations. Unlike aggregate analytic functions, however, they provide a different calculation for each record.
To accomplish this, window analytic functions use the following methods:
createInitialState(): to reset state for the current window or partition.
checkWindowSpecificiation(boolean isPartitioned, boolean isOrdered, WindowFrameSpecification windowFrameSpec): to validate that the window definition meets requirements such as being sorted.
call(S state, UserDefinedWindowFunctionAdapter.partitionEntry currentEntry, UserDefinedWindowFunctionAdapater.partion partition, int currentIndex, WindowFrame currentWindowSpec, WindowFrame prevWindowSpec): window analytic functions must implement this method with the core logic of the function and return the result of the calculation as
WindowFunctionResult<S,O>.
There are also basic 'housekeeping' methods: getParameterTypes() and getReturnType().
You set up your development environment for window analytic functions just the same as for plain functions. See
Set Up Your Development Environment for details.
We’re going to use two examples. The first example shows the basics of a window analytics function and how to track state and set the function result. This example implements a simple sum. To create this function, you:
The second example implements the
MashZone NextGen built-in
lead function which illustrates techniques to
Work with Specific Records in Window Calculations using the current position of a record.
Construct and Initialize the Window Analytic Function Class
This example, MySumWindowFunction.java is available in the sample user-defined functions package at MashZoneNG-install/raql-udfs/SampleRaqlLib.
Your window analytic function class imports the RAQL UDF annotation class, com.jackbe.jbp.raql.udx.loader.RaqlFunc, various classes in theMashZone NextGenRAQL User Defined Function API and implements the UserDefinedWindowFunctionAdapter interface:
package com.raql.samples;
import java.util.Arrays;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.Adapters;
import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;
import de.rtm.push.adapters.windowfunctions.WindowFrame
import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;
import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;
import de.rtm.util.exception.IncompatibleWindowSpecificationException;
import de.rtm.util.records.metadata.FieldMetaData;
import de.rtm.util.records.metadata.FieldMetaDatas;
import de.rtm.util.records.types.JavaTypes;
import de.rtm.util.records.types.JavaTypes.Type;
/**
* This window function adapter computes the sum of a window.
*/
@RaqlFunc(name="mySumFunction")
public static class WindowSum implements UserDefinedWindowFunctionAdapter<Double, Double> {
protected final Type[] parameterTypes;
public WindowSum(Type[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* Returns the type of the result, which is simply always Double.
*/
@Override public Type getReturnType() {
return Type.DOUBLE;
}
/**
* Returns the types of the input parameters.
*/
@Override public Type[] getParameterTypes() {
return parameterTypes;
}
/**
* Creates the initial state of the window sum, which is 0.
*/
@Override
public Double createInitialState() {
return 0d;
}
}
This example overrides the default implementation for createInitialState to reset the sum to zero.
Implement the call Method for the Window Analytic Function
You implement the call method with the core logic for your window anlaytic function. The state parameter represents either the initial state (for the first window evaluation) or the state computed from the previous window evaluation.
Note: | User-defined functions should be stateless wherever possible as a best practice. Instead, you can manage intermediate state information for window calculations in the state property of the WindowFunctionResult. |
It also has parameters for the current row within the current partition, the current partition, the index for the current row and specifications for the current window and the previous window that define the context for the function.
...
/**
* Computes the sum for the current window. Instead of simply summing
* up the values of the current window, only the required values
* from current and previous window are combined with the state of
* the previous window evaluation. This approach allows for a more
* efficient evaluation.
*
* @param state The state which is either the initial state for
* the first window evaluation or the state as
* computed during the previous window evaluation
* @param currentEntry The current row of the partition being
* processed
* @param partition The partition being processed
* @param currentIndex The index of the current row being
* processed
* @param currentWindowSpec The specification of the current window
* frame within the partition
* @param previousWindowSpec The specification of the previous window
* frame within the partition
* @return The result of the window function for the current window
*/
@Override
public WindowFunctionResult<Double, Double> call(Double state, PartitionEntry currentEntry,
Partition partition, int currentIndex, WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {
// initialize new sum with old sum from state
Double newSum = state;
// if previous window has values, remove any from the subtotal that
// are not in current window
boolean valuesInPreviousWindow = previousWindowSpec != null && !previousWindowSpec.isEmpty();
if (valuesInPreviousWindow) {
final int removeTo = Math.min(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());
for (int i = previousWindowSpec.getStartIndex(); i < removeTo; i++) {
newSum -= ((Number) partition.get(i).getColumnValue(0)).doubleValue();
}
}
// and add values from current window that are not in previous window
final int addFrom = !valuesInPreviousWindow?currentWindowSpec.getStartIndex()
:Math.max(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());
for (int i = addFrom; i < currentWindowSpec.getEndIndex(); i++) {
newSum += ((Number)partition.get(i).getColumnValue(0)).doubleValue();
}
// return the new sum as state for use in the next window
// evaluation and as result for the current row
return Adapters.createWindowFunctionResult(newSum, newSum);
}
@Override
public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered, WindowFrameSpecification
windowFrameSpecification) throws IncompatibleWindowSpecificationException {
// nothing to be done
}
}
The call method returns an object that implements the WindowFunctionResult interface. WindowFunctionResult instances contain:
The result of the function, in this case the sum of this numeric column for the current row in the current window and partition.
State information needed to apply this function to the next row.
In this example, the state is also simply the result of the function. The calculation of the sum for each row uses this as a starting point and then backs out values for any rows that are no longer considered part of the window and adds in values for any new rows in the window.
Lastly, this implements the checkWindowSpecification method that is required for the interface as a no op method.
Complete WindowSum Example
package com.raql.samples;
import java.util.Arrays;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.Adapters;
import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;
import de.rtm.push.adapters.windowfunctions.WindowFrame
import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;
import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;
import de.rtm.util.exception.IncompatibleWindowSpecificationException;
import de.rtm.util.records.metadata.FieldMetaData;
import de.rtm.util.records.metadata.FieldMetaDatas;
import de.rtm.util.records.types.JavaTypes;
import de.rtm.util.records.types.JavaTypes.Type;
/**
* This window function adapter computes the sum of a window.
*/
@RaqlFunc(name="mySumFunction")
public static class WindowSum implements UserDefinedWindowFunctionAdapter<Double, Double> {
protected final Type[] parameterTypes;
public WindowSum(Type[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* Returns the type of the result, which is simply always Double.
*/
@Override public Type getReturnType() {
return Type.DOUBLE;
}
/**
* Returns the types of the input parameters.
*/
@Override public Type[] getParameterTypes() {
return parameterTypes;
}
/**
* Creates the initial state of the window sum, which is 0.
*/
@Override
public Double createInitialState() {
return 0d;
}
/**
* Computes the sum for the current window. Instead of simply summing
* up the values of the current window, only the required values
* from current and previous window are combined with the state of
* the previous window evaluation. This approach allows for a more
* efficient evaluation.
*
* @param state The state which is either the initial state for
* the first window evaluation or the state as
* computed during the previous window evaluation
* @param currentEntry The current row of the partition being
* processed
* @param partition The partition being processed
* @param currentIndex The index of the current row being
* processed
* @param currentWindowSpec The specification of the current window
* frame within the partition
* @param previousWindowSpec The specification of the previous window
* frame within the partition
* @return The result of the window function for the current window
*/
@Override
public WindowFunctionResult<Double, Double> call(Double state, PartitionEntry currentEntry,
Partition partition, int currentIndex, WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {
// initialize new sum with old sum from state
Double newSum = state;
// if previous window has values, remove any from the subtotal that
// are not in current window
boolean valuesInPreviousWindow = previousWindowSpec != null && !previousWindowSpec.isEmpty();
if (valuesInPreviousWindow) {
final int removeTo = Math.min(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());
for (int i = previousWindowSpec.getStartIndex(); i < removeTo; i++) {
newSum -= ((Number) partition.get(i).getColumnValue(0)).doubleValue();
}
}
// and add values from current window that are not in previous window
final int addFrom = !valuesInPreviousWindow?currentWindowSpec.getStartIndex()
:Math.max(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());
for (int i = addFrom; i < currentWindowSpec.getEndIndex(); i++) {
newSum += ((Number)partition.get(i).getColumnValue(0)).doubleValue();
}
// return the new sum as state for use in the next window
// evaluation and as result for the current row
return Adapters.createWindowFunctionResult(newSum, newSum);
}
@Override
public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered,
WindowFrameSpecification windowFrameSpecification) throws IncompatibleWindowSpecificationException {
// nothing to be done
}
}
Work with Specific Records in Window Calculations
This example is a window analytics function similar to the MashZone NextGen built-in lead function which returns the value for the specified column for a row (a record) that follows the current row by a specific offset:
package com.raql.samples;
import java.io.Serializable;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.Adapters;
import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;
import de.rtm.push.adapters.windowfunctions.WindowFrame;
import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;
import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;
import de.rtm.util.exception.IncompatibleWindowSpecificationException;
import de.rtm.util.records.metadata.FieldMetaData;
import de.rtm.util.records.metadata.FieldMetaDatas;
import de.rtm.util.records.types.JavaTypes.Type;
/**
* This window function adapter determines the lead value for a
* specific row based on an offset within a window.
*/
@RaqlFunc(name="myLeadFunction")
public static class Lead implements UserDefinedWindowFunctionAdapter<Serializable, Serializable> {
private final static Serializable DEFAULT_DEFAULT_VALUE = null;
private final static int DEFAULT_OFFSET = 1;
protected final Type returnType;
protected final Type[] parameterTypes;
public Lead(Type[] parameterTypes) {
this.returnType = parameterTypes[0];
this.parameterTypes = parameterTypes;
}
/**
* Returns the type of the result.
*/
@Override
public Type getReturnType() {
return returnType;
}
/**
* Returns the types of the input parameters.
*/
@Override
public Type[] getParameterTypes() {
return parameterTypes;
}
/**
* Creates the initial state.
*/
@Override
public Serializable createInitialState() {
return null;
}
/**
* Determines the lead value of the current window. This has two
* optional parameters: the offset and the default value ?where?.
*
* @return The result of the window function for the current window
*/
@Override
public WindowFunctionResult<Serializable, Serializable> call(Serializable state,
PartitionEntry currentEntry, Partition partition, int currentIndex, WindowFrame currentWindowSpec,
WindowFrame previousWindowSpec) {
final Serializable[] columnValues = currentEntry.getColumnValues();
// when the offset is not specified, use the default offset, which is 1
final int offset = columnValues.length > 1 ? (Integer)columnValues[1] : DEFAULT_OFFSET;
final int index = currentIndex + offset;
final Serializable result;
if (index < currentWindowSpec.getStartIndex() || index >= currentWindowSpec.getEndIndex())
// when the default value is not specified, use the default default value, which is null
result = columnValues.length > 2 ? columnValues[2] : DEFAULT_DEFAULT_VALUE;
else
result = partition.get(index).getColumnValue(0);
return Adapters.createWindowFunctionResult(result, result);
}
@Override
public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered, WindowFrameSpecification
windowFrameSpecification) throws IncompatibleWindowSpecificationException {
// nothing to be done
}
}
To work with a specific record relative to the current record, this function uses both the index of the current record as well as the specification of the current window.