Work with Specific Records in Window Calculations

This example is a window analytics function similar to the MashZone NextGen built-in lead function which returns the value for the specified column for a row (a record) that follows the current row by a specific offset:

package com.raql.samples;

import java.io.Serializable;

import com.jackbe.jbp.raql.udx.loader.RaqlFunc;

import de.rtm.push.adapters.Adapters;

import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;

import de.rtm.push.adapters.windowfunctions.WindowFrame;

import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;

import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;

import de.rtm.util.exception.IncompatibleWindowSpecificationException;

/**

* This window function adapter determines the lead value for a

* specific row based on an offset within a window.

*/

@RaqlFunc(name="myLeadFunction")

public class Lead implements UserDefinedWindowFunctionAdapter

<Serializable, Serializable> {

private final static Serializable DEFAULT_DEFAULT_VALUE = null;

private final static int DEFAULT_OFFSET = 1;

protected final Type returnType;

protected final Type[] parameterTypes;

public Lead(Type[] parameterTypes) {

this.returnType = parameterTypes[0];

this.parameterTypes = parameterTypes;

}

/**

* Returns the type of the result.

*/

@Override

public Type getReturnType() {

return returnType;

}

/**

* Returns the types of the input parameters.

*/

@Override

public Type[] getParameterTypes() {

return parameterTypes;

}

/**

* Creates the initial state.

*/

@Override

public Serializable createInitialState() {

return null;

}

/**

* Determines the lead value of the current window. This has two

* optional parameters: the offset and the default value ?where?.

*

* @return The result of the window function for the current window

*/

@Override

public WindowFunctionResult<Serializable, Serializable> call(Serializable state,

PartitionEntry currentEntry, Partition partition, int currentIndex,

WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {

final Serializable[] columnValues = currentEntry.getColumnValues();

// when the offset is not specified, use the default offset, which is 1

final int offset = columnValues.length > 1 ? (Integer)columnValues[1] :

DEFAULT_OFFSET;

final int index = currentIndex + offset;

final Serializable result;

if (index < currentWindowSpec.getStartIndex() || index >=

currentWindowSpec.getEndIndex())

// when the default value is not specified, use the default default value,

// which is null

result = columnValues.length > 2 ? columnValues[2] : DEFAULT_DEFAULT_VALUE;

else

result = partition.get(index).getColumnValue(0);

return Adapters.createWindowFunctionResult(result, result);

}

@Override

public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered,

WindowFrameSpecification windowFrameSpecification) throws

IncompatibleWindowSpecificationException {

// nothing to be done

}

}

To work with a specific record relative to the current record, this function uses both the index of the current record as well as the specification of the current window.