Apama 10.15.0 | Developing Apama Applications | EPL Reference | Aggregate Functions | Custom aggregates
 
Custom aggregates
In a stream query, you can specify an aggregate function in the select clause. If one of the supplied aggregate functions does not meet your needs, you can define a custom aggregate function for use in a select clause.
You define custom aggregate functions in a .mon file and outside of an event or a monitor. The aggregate function's scope is the package in which you declare it. To use custom aggregate functions in monitors in other packages, specify the aggregate function's fully-qualified name, for example:
from a in all A() select com.myCorporation.custom.myCustomAggregate(a)
Alternatively, you can specify a using statement. See The using declaration.
Specify bounded when you are defining a custom aggregate function that will work with only a bounded window. That is, a stream query cannot specify retain all. Specify unbounded when you are defining a custom aggregate function that will work with only an unbounded window. That is, a stream query must specify retain all. Do not specify either bounded or unbounded when you are defining a custom aggregate function that will work with either a bounded or an unbounded window.
The name of a custom aggregate function must be unique within a package; you cannot overload it or define an event or monitor with the same name as an aggregate function.
The list of formal parameters consists of zero or more comma-separated type/name pairs. Each pair indicates the type and the name of an argument that you are passing to the aggregate function. For example, (float price, integer quantity).
The data type name must be an EPL type. This is the type of the value that your aggregate function returns.
The body of a custom aggregate function can contain fields that are specific to one instance of the custom aggregate function and actions to operate on the state.
Actions
In a custom aggregate function, the init(), add(), remove() and value() actions are special. They define how stream queries interact with custom aggregate functions.
*init() — If a custom aggregate function defines an init() action, it must take no arguments and must not return a value. The correlator executes the init() action once for each new aggregate function instance it creates in a stream query.
*add() — A custom aggregate function must define an add() action. The add() action must take the same ordered set of arguments that are specified in the custom aggregate function signature. That is, the names, types, and order of the arguments must all be the same. The correlator executes the add() action once for each item added to the set of items that the aggregate function is operating on.
*remove() — A bounded aggregate function must define a remove() action. An unbounded aggregate function must not define a remove() action. If you do not specify either bounded or unbounded, the remove() action is optional. The remove() action must take the same ordered set of arguments as the add() action, followed by an argument of the type returned by add(), if any, and must not return a value. The correlator executes the remove() action once for each item that leaves the set of items that the aggregate function is operating on. The value that remove() is called with is the same value that add() was called with.
*value() — All custom aggregate functions must define a value() action. The value() action must take no arguments and its return type must match the return type in the aggregate function signature. In a stream query, the correlator executes the value() action once per batch per group, and returns the current aggregate value to the query.
Custom aggregate functions can declare other actions, including actions that are executed by the above named actions. A custom aggregate function cannot contain a field whose name is onBeginRecovery, onConcludeRecovery, init, add, value, or remove, even if, for example, the custom aggregate function does not define a remove() action.
Fields
In the body of a custom aggregate function, you can define fields that are specific to the custom aggregate instance they are in.