Apama 10.15.1 | Developing Apama Applications | Developing Apama Applications in EPL | Working with Streams and Stream Queries | Defining custom aggregate functions | Distinguishing duplicate values in custom aggregate functions
 
Distinguishing duplicate values in custom aggregate functions
Each item in a stream is considered to be unique. However, when duplicate values appear in the set of items that a custom aggregate function operates on, it is not possible for the function to identify the particular instance of the value. If your implementation requires being able to distinguish between instances of duplicate values, you can accomplish this by extending the signatures of the function's add() and remove() actions.
For example, you might see the following set of float values in a stream:
1.0   2.0   3.0   4.0   3.0   2.0   1.0
Each occurrence of a particular value in the stream represents an individual value, separate from any other occurrences of that value. But when a query presents these values to a custom aggregate function (by means of the add() and remove() actions), the value alone is not enough to identify the particular occurrence that this value represents.
To distinguish one occurrence from another, extend the action signatures as follows:
*The add() action can return a value, which can be of any type.
*If the add() action does return a value, then the remove() action must accept, as its last argument in addition to its standard arguments, an argument of the same type as that returned by the add() action.
When an item is added to the aggregate, the value returned by the add() action is stored with the item. When that item is removed from the aggregate, the same value will be passed to the remove() action. Thus, it is possible to distinguish between items with duplicate values by comparing the additional data that is passed to the remove() action.
The following example shows an aggregate function that returns the entire window contents, in order, as a sequence:
aggregate windowOf(float f) returns sequence<float> {
dictionary<integer,float> d;
integer i;

action init() { d.clear(); i := 0; }
action add(float f) returns integer {
i := i+1;
d[i] := f;
return i;
}
action remove(float f, integer k) { d.remove(k); }
action value() returns sequence<float> { return d.values(); }
}