You implement the call method with the core logic for your window anlaytic function. The state parameter represents either the initial state (for the first window evaluation) or the state computed from the previous window evaluation.
Note: User-defined functions should be stateless wherever possible as a best practice. Instead, you can manage intermediate state information for window calculations in the state property of the WindowFunctionResult.
It also has parameters for the current row within the current partition, the current partition, the index for the current row and specifications for the current window and the previous window that define the context for the function.
...
/**
* Computes the sum for the current window. Instead of simply summing
* up the values of the current window, only the required values
* from current and previous window are combined with the state of
* the previous window evaluation. This approach allows for a more
* efficient evaluation.
*
* @param state The state which is either the initial state for
* the first window evaluation or the state as
* computed during the previous window evaluation
* @param currentEntry The current row of the partition being
* processed
* @param partition The partition being processed
* @param currentIndex The index of the current row being
* processed
* @param currentWindowSpec The specification of the current window
* frame within the partition
* @param previousWindowSpec The specification of the previous window
* frame within the partition
* @return The result of the window function for the current window
*/
@Override
public WindowFunctionResult<Double, Double> call(Double state,
PartitionEntry currentEntry, Partition partition, int currentIndex,
WindowFrame currentWindowSpec, WindowFrame previousWindowSpec) {
// initialize new sum with old sum from state
Double newSum = state;
// if previous window has values, remove any from the subtotal that
// are not in current window
boolean valuesInPreviousWindow = previousWindowSpec != null &&
!previousWindowSpec.isEmpty();
if (valuesInPreviousWindow) {
final int removeTo = Math.min(currentWindowSpec.getStartIndex(),
previousWindowSpec.getEndIndex());
for (int i = previousWindowSpec.getStartIndex(); i < removeTo; i++) {
newSum -= ((Number) partition.get(i).getColumnValue(0)).doubleValue();
}
}
// and add values from current window that are not in previous window
final int addFrom = !valuesInPreviousWindow?currentWindowSpec.getStartIndex()
:Math.max(currentWindowSpec.getStartIndex(), previousWindowSpec.getEndIndex());
for (int i = addFrom; i < currentWindowSpec.getEndIndex(); i++) {
newSum += ((Number)partition.get(i).getColumnValue(0)).doubleValue();
}
// return the new sum as state for use in the next window
// evaluation and as result for the current row
return Adapters.createWindowFunctionResult(newSum, newSum);
}
@Override
public void checkWindowSpecification(boolean isPartitioned, boolean isOrdered,
WindowFrameSpecification windowFrameSpecification) throws
IncompatibleWindowSpecificationException {
// nothing to be done
}
}
The call method returns an object that implements the WindowFunctionResult interface. WindowFunctionResult instances contain:
Lastly, this implements the checkWindowSpecification method that is required for the interface as a no op method.