Developing Apama Applications > Developing Apama Applications in EPL > Working with Streams and Stream Queries > Defining stream queries > Generating query results > Aggregating items in projections
Aggregating items in projections
An aggregate function calculates a single value over a window. If a select expression contains any aggregate functions then references to the input item can appear only in the arguments to those aggregate functions. Any EPL expression can appear in the arguments to the function, but other aggregate functions may not. EPL provides several built-in aggregate functions and you can define additional ones. See Defining custom aggregate functions.
Descriptions of built-in aggregate functions
EPL provides the following built-in aggregate functions. In the table, the argument names, for example, value and weight, are placeholders for expressions. Additional information about some of these functions follows the table.
Aggregate Function
Argument(s)
Returns
Result Description
avg(value)
decimal or float
decimal or float
The arithmetic mean of the values in the window. The avg() and mean() functions do exactly the same thing. They are aliases for each other.
count()
-
integer
The number of items in the window, including any NaN items
count(predicate)
boolean
integer
The number of items for which the argument is true
count(value)
decimal or float
integer
The number of items where the decimal or float value is not NaN
first(value)
decimal or float
decimal or float
The earliest value in the window being aggregated over
last(value)
decimal or float
decimal or float
The latest value in the window being aggregated over
max(value)
decimal or float
decimal or float
The maximum value
mean(value)
decimal or float
decimal or float
The arithmetic mean of the values in the window. The mean() and avg() functions do exactly the same thing. They are aliases for each other
min(value)
decimal or float
decimal or float
The minimum value
nth(value,index)
decimal, integer or float, integer
decimal or float
The value of the specified decimal or float item in the index position, starting with the earliest item in the window (item 0) and moving toward the latest item. nth(value,0) returns the same item as first(value).
prior(value,index) .
decimal, integer or float, integer
decimal or float
The value of the specified decimal or float item in the index position, starting with the most recent item in the window (item 0 ) and moving toward the earliest item. prior(value,0) returns the same item as last(value).
stddev(value)
decimal or float
decimal or float
The standard deviation of the values
stddev2(value)
decimal or float
decimal or float
The sample standard deviation of the values
sum(value)
decimal, float or integer
decimal, float or integer
The sum of the values
wavg(value,weight)
decimal, decimal or float, float
decimal or float
The weighted average of the values where each value is weighted by the corresponding weight
Calculations by the built-in aggregate functions might be affected by underflow and overflow. For example, adding a very large number to the collection that the sum() function operates on, then adding a very small number, and then removing the very large number will probably result in 0.0, and not the very small number. Just adding the very small number would result in behavior that you would expect. As with the rest of EPL, the overflow and underflow characteristics are as defined for IEEE 64-bit floating point numbers.
Overloaded functions
The sum() function is overloaded. You can specify a decimal, float or integer. The return type matches the argument type.
The avg(), first(), last(), max(), mean(), min(), nth(), prior(), stddev(), and stddev2() functions are overloaded. You can specify a decimal or a float. The return type matches the argument type.
The count() function is overloaded. You can specify a decimal or a float. The return type is an integer.
The wavg()function is overloaded. You can specify a decimal, decimal or a float, float combination. The return type will be a decimal or a float, respectively.
Enabling use of built-in aggregate functions
The built-in aggregate functions reside in the com.apama.aggregates package. To use a built-in aggregate function in a query you must do one of the following:
*Specify the full name of the aggregate function. For example:
select com.apama.aggregates.sum(x)
*For each aggregate function you want to use in your code, add a using statement. This lets you specify aggregate function names without specifying the package name. For example:
using com.apama.aggregates.mean;
using com.apama.aggregates.stddev;
...
...select MeanSD( mean(s), stddev(s) );
Insert the using statement after the optional package declaration and before any other declarations in the .mon file.
Operating on empty windows
Except for the sum() and count() functions, if the window being aggregated over is empty or insufficiently large then the result is not-a-number (NaN). The sum() and count() functions return zero if the window is empty.
IEEE special values in aggregate functions
Several of the built-in aggregate functions take decimal or float arguments. It is possible for a decimal or float value to be one of the following:
*Positive infinity
*Negative infinity
*Not-a-number (NaN)
*A finite number
The four positional aggregates (first(), last(), nth() and prior()) are agnostic to the values in them and return the selected item regardless of its value. If the selected item does not exist (for example, selecting the fifth item from a window of three items), then the aggregate returns NaN. The index for nth() and prior() must not be negative. If it is, the correlator terminates the monitor instance.
All the remaining (arithmetic) aggregate functions that take float or decimal arguments ignore any NaN items that are in the window being aggregated. The result is the aggregate of the window without the NaN items. If you want to count all items including NaN items then use the count() aggregate function that takes no arguments.
The behavior of arithmetic aggregate functions over windows that contain positive and negative infinities varies depending on the particular function. The result is either an infinity, NaN or a finite value. The table below shows for a window containing one or more positive infinities and no negative infinities, one or more negative infinities and no positive infinities, or at least one positive and at least one negative infinity, which aggregate function gives which result. In the case of the wavg() function the result depends on whether the infinity is the value or the weight.
Input
Outputs +
Outputs -
Outputs NaN
Outputs Finite Value
+Inf
max()
mean()
sum()
wavg(value)
stddev()
wavg(weight)
min()
-Inf
mean()
min()
sum()
wavg(value)
stddev()
wavg(weight)
max()
Both
max()
min()
mean()
stddev()
sum()
wavg()
Grouping output items
In a select clause, when you do not specify a group by clause any aggregate functions in the projection operate on all values in the window. This is true even if you partitioned the window. To group the items in the window into one or more separate groups and to calculate an aggregate value for each group of items, use the group by clause. The syntax of the group by clause is as follows:
group by groupByExpr[, groupByExpr]...
Each groupByExpr is an expression that returns a value of a comparable type.
See Comparable types in the "Types" section of the EPL Reference.
These expressions form the group key, which determines which group each output item is a part of. Any aggregate functions in the select expression operate over each group separately.
In an aggregate projection, you can refer to any group key expressions anywhere in the select expression. However, you can refer to a query input item only in an aggregate function argument. For example:
from t in all Tick() within 30.0
   group by t.symbol select TickAverage(t.symbol, mean(t.price));
Whenever a lot arrives this query updates one or more groups. Every group that is updated outputs a TickAverage event, and all TickAverage events are in the same lot. Each TickAverage event contains the symbol and the average price for that symbol over the last thirty seconds. If a group is not updated, it does not output a TickAverage event.
You typically use a group by clause in a stream query in conjunction with a partition by clause. In the following example, the window contains up to 10 events for each stock symbol. The aggregate projection calculates the average price separately for each symbol and each average is based on up to 10 events:
from t in ticks partition by t.symbol retain 10
   group by t.symbol select mean(t.price);
Obtaining the query’s remove stream
For each query, there are items that have been added to the window in a given query activation and items that have been removed (they were previously in the window, but are no longer in the window). By default, a simple, non-aggregate projection returns the items that have been added to the window. This is the istream. To obtain the items that have been removed from the window, add the rstream keyword to the select clause.
For aggregate projections, obtaining the rstream is not meaningful and therefore the rstream keyword is not allowed in aggregate projections.
For examples of specifying rstream, see Defining time-based windows, Defining size-based windows, Defining cross-joins with two from clauses and Defining equi-joins with the join clause.
When you specify retain all you cannot specify rstream.
Copyright © 2013 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or Terracotta Inc., San Francisco, CA, USA, and/or Software AG (Canada) Inc., Cambridge, Ontario, Canada, and/or, Software AG (UK) Ltd., Derby, United Kingdom, and/or Software A.G. (Israel) Ltd., Or-Yehuda, Israel and/or their licensors.