package com.raql.samples;
import com.jackbe.jbp.raql.udx.loader.RaqlFunc;
import de.rtm.push.adapters.Adapters;
import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;
import de.rtm.push.adapters.windowfunctions.WindowFrame;
import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;
import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;
import de.rtm.util.exception.IncompatibleWindowSpecificationException;
/**
* This window function adapter computes the sum of a window.
*/
@RaqlFunc(name="mySumFunction")
public class WindowSum implements UserDefinedWindowFunctionAdapter<Double, Double> {
protected final Type[] parameterTypes;
public WindowSum(Type[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* Returns the type of the result, which is simply always Double.
*/
@Override public Type getReturnType() {
return Type.DOUBLE;
}
/**
* Returns the types of the input parameters.
*/
@Override public Type[] getParameterTypes() {
return parameterTypes;
}
/**
* Creates the initial state of the window sum, which is 0.
*/
@Override
public Double createInitialState() {
return 0d;
}
/**
* Computes the sum for the current window. Instead of simply summing
* up the values of the current window, only the required values
* from current and previous window are combined with the state of
* the previous window evaluation. This approach allows for a more
* efficient evaluation.
*
* @param state The state which is either the initial state for
* the first window evaluation or the state as
* computed during the previous window evaluation
* @param currentEntry The current row of the partition being
* processed
* @param partition The partition being processed
* @param currentIndex The index of the current row being
* processed
* @param currentWindowSpec The specification of the current window
* frame within the partition
* @param previousWindowSpec The specification of the previous window
* frame within the partition
* @return The result of the window function for the current window
*/
@Override
public WindowFunctionResult<Double, Double> call(Double state,
PartitionEntry currentEntry, Partition partition, int currentIndex,
WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {
// initialize new sum with old sum from state
Double newSum = state;
// if previous window has values, remove any from the subtotal that
// are not in current window
boolean valuesInPreviousWindow = previousWindowSpec != null &&
!previousWindowSpec.isEmpty();
if (valuesInPreviousWindow) {
final int removeTo = Math.min(currentWindowSpec.getStartIndex(),
previousWindowSpec.getEndIndex());
for (int i = previousWindowSpec.getStartIndex(); i < removeTo; i++) {
newSum -= ((Number) partition.get(i).getColumnValue(0)).doubleValue();
}
}
// and add values from current window that are not in previous window
final int addFrom = !valuesInPreviousWindow?currentWindowSpec.getStartIndex()
:Math.max(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());
for (int i = addFrom; i < currentWindowSpec.getEndIndex(); i++) {
newSum += ((Number)partition.get(i).getColumnValue(0)).doubleValue();
}
// return the new sum as state for use in the next window
// evaluation and as result for the current row
return Adapters.createWindowFunctionResult(newSum, newSum);
}
@Override
public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered,
WindowFrameSpecification windowFrameSpecification) throws
IncompatibleWindowSpecificationException {
// nothing to be done
}
}