Complete WindowSum Example

package com.raql.samples;

import com.jackbe.jbp.raql.udx.loader.RaqlFunc;

import de.rtm.push.adapters.Adapters;

import de.rtm.push.adapters.windowfunctions.UserDefinedWindowFunctionAdapter;

import de.rtm.push.adapters.windowfunctions.WindowFrame;

import de.rtm.push.adapters.windowfunctions.WindowFrameSpecification;

import de.rtm.push.adapters.windowfunctions.WindowFunctionResult;

import de.rtm.util.exception.IncompatibleWindowSpecificationException;

/**

* This window function adapter computes the sum of a window.

*/

@RaqlFunc(name="mySumFunction")

public class WindowSum implements UserDefinedWindowFunctionAdapter<Double, Double> {

protected final Type[] parameterTypes;

public WindowSum(Type[] parameterTypes) {

this.parameterTypes = parameterTypes;

}

/**

* Returns the type of the result, which is simply always Double.

*/

@Override public Type getReturnType() {

return Type.DOUBLE;

}

/**

* Returns the types of the input parameters.

*/

@Override public Type[] getParameterTypes() {

return parameterTypes;

}

/**

* Creates the initial state of the window sum, which is 0.

*/

@Override

public Double createInitialState() {

return 0d;

}

/**

* Computes the sum for the current window. Instead of simply summing

* up the values of the current window, only the required values

* from current and previous window are combined with the state of

* the previous window evaluation. This approach allows for a more

* efficient evaluation.

*

* @param state The state which is either the initial state for

* the first window evaluation or the state as

* computed during the previous window evaluation

* @param currentEntry The current row of the partition being

* processed

* @param partition The partition being processed

* @param currentIndex The index of the current row being

* processed

* @param currentWindowSpec The specification of the current window

* frame within the partition

* @param previousWindowSpec The specification of the previous window

* frame within the partition

* @return The result of the window function for the current window

*/

@Override

public WindowFunctionResult<Double, Double> call(Double state,

PartitionEntry currentEntry, Partition partition, int currentIndex,

WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {

// initialize new sum with old sum from state

Double newSum = state;

// if previous window has values, remove any from the subtotal that

// are not in current window

boolean valuesInPreviousWindow = previousWindowSpec != null &&

!previousWindowSpec.isEmpty();

if (valuesInPreviousWindow) {

final int removeTo = Math.min(currentWindowSpec.getStartIndex(),

previousWindowSpec.getEndIndex());

for (int i = previousWindowSpec.getStartIndex(); i < removeTo; i++) {

newSum -= ((Number) partition.get(i).getColumnValue(0)).doubleValue();

}

}

// and add values from current window that are not in previous window

final int addFrom = !valuesInPreviousWindow?currentWindowSpec.getStartIndex()

:Math.max(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());

for (int i = addFrom; i < currentWindowSpec.getEndIndex(); i++) {

newSum += ((Number)partition.get(i).getColumnValue(0)).doubleValue();

}

// return the new sum as state for use in the next window

// evaluation and as result for the current row

return Adapters.createWindowFunctionResult(newSum, newSum);

}

@Override

public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered,

WindowFrameSpecification windowFrameSpecification) throws

IncompatibleWindowSpecificationException {

// nothing to be done

}

}