This example is a window analytics function similar to the MashZone NextGen built-in lead function which returns the value for the specified column for a row (a record) that follows the current row by a specific offset:
package com.raql.samples;
import java.io.Serializable;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.Adapters;
import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;
import de.rtm.push.adapters.windowfunctions.WindowFrame;
import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;
import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;
import de.rtm.util.exception.IncompatibleWindowSpecificationException;
/**
* This window function adapter determines the lead value for a
* specific row based on an offset within a window.
*/
@RaqlFunc(name="myLeadFunction")
public class Lead implements UserDefinedWindowFunctionAdapter
<Serializable, Serializable> {
private final static Serializable DEFAULT_DEFAULT_VALUE = null;
private final static int DEFAULT_OFFSET = 1;
protected final Type returnType;
protected final Type[] parameterTypes;
public Lead(Type[] parameterTypes) {
this.returnType = parameterTypes[0];
this.parameterTypes = parameterTypes;
}
/**
* Returns the type of the result.
*/
@Override
public Type getReturnType() {
return returnType;
}
/**
* Returns the types of the input parameters.
*/
@Override
public Type[] getParameterTypes() {
return parameterTypes;
}
/**
* Creates the initial state.
*/
@Override
public Serializable createInitialState() {
return null;
}
/**
* Determines the lead value of the current window. This has two
* optional parameters: the offset and the default value ?where?.
*
* @return The result of the window function for the current window
*/
@Override
public WindowFunctionResult<Serializable, Serializable> call(Serializable state,
PartitionEntry currentEntry, Partition partition, int currentIndex,
WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {
final Serializable[] columnValues = currentEntry.getColumnValues();
// when the offset is not specified, use the default offset, which is 1
final int offset = columnValues.length > 1 ? (Integer)columnValues[1] :
DEFAULT_OFFSET;
final int index = currentIndex + offset;
final Serializable result;
if (index < currentWindowSpec.getStartIndex() || index >=
currentWindowSpec.getEndIndex())
// when the default value is not specified, use the default default value,
// which is null
result = columnValues.length > 2 ? columnValues[2] : DEFAULT_DEFAULT_VALUE;
else
result = partition.get(index).getColumnValue(0);
return Adapters.createWindowFunctionResult(result, result);
}
@Override
public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered,
WindowFrameSpecification windowFrameSpecification) throws
IncompatibleWindowSpecificationException {
// nothing to be done
}
}
To work with a specific record relative to the current record, this function uses both the index of the current record as well as the specification of the current window.