Apama 10.7.2 | Developing Apama Applications | Developing Apama Applications in EPL | Defining Queries | Defining query input | Using source timestamps of events
 
Using source timestamps of events
By default, the query runtime assumes that events should be treated to be in the order in which they are processed, and the time of each event is the correlator's time at the point the event is processed. This is suitable if events are delivered reliably to the Apama correlator in a short amount of time and in order. However, if the events are delayed, accumulated into batches before being sent or delivered over unreliable networks, then it may be necessary to use the time at which an event happened at the event source, which would have to be available in the event in order for queries to use the source timestamp. For example, a car may measure the engine's temperature, RPM and other important statistics along with a timestamp, and record these in a small computer in the car. Periodically, when the car is connected to a wireless network, the car will send this data as a batch of events. For the correct behavior of queries that make use of the time or ordering of events, the query will need to be configured to use the source timestamp.
Note:
Source timestamps are not intended to be a replacement for Xclock. They can, however, be used in conjunction with Xclock for testing purposes. Xclock is controlling the correlator's time (see Disabling the correlator's internal clock). Source timestamps indicate the time at which an event occurred.
In order to use the source timestamp:
*Every event which may be delayed should contain the source timestamp in some form.
*An action must be defined on the event definition, which takes no parameters and returns a float. This should calculate the source time of the event - typically the time the event occurred - based on the fields of the event. The return value of the action should specify the time in seconds since the epoch (midnight, 1 Jan 1970 UTC). If the event contains the time in seconds since the epoch (in this example, stored in a field named sourceTime), this can be as simple as the following:
action getSourceTime() returns float { return sourceTime; }
Otherwise, the TimeFormat event library can be used to help convert from time of day and date, and perform timezone conversions as necessary. For example, if the source timestamps in your events are not already in the UTC timezone, then one way to do this is to include a timezone field and then use the TimeFormat event's parseTimeWithTimeZone action to obtain the source time in the correct form as shown in the following event definition:
using com.apama.correlator.timeformat.TimeFormat;
using com.apama.queries.TimeFrom;

@TimeFrom("getSourceTime")
event E {
integer k;
string sourceTime;
string timeZone;

action getSourceTime() returns float {
TimeFormat timeFormat := TimeFormat();
return timeFormat.parseTimeWithTimeZone("HH:MM:SS", sourceTime,
timeZone);
}
}
See also Using the TimeFormat Event Library.
*The event definition should have a @TimeFrom annotation as in the above example (see also Adding predefined annotations) or queries that use the event as an input must specify a time from clause that names the action that provides the source time. In either case, queries must always specify a maximum time to wait for the events (see below). If both are specified, the time from clause in the query takes precedence.
Note:
You are not permitted to use the event's built-in getTime() method. This method returns the time when the correlator either processed or created the event, which defeats the purpose of the source timestamp functionality.
Waiting for delayed events
If using source timestamps, we assume events may be delayed between the source time at which they occur and being processed by the Apama correlator. If no events are received by the correlator, it needs to distinguish between no events having occurred and events being delayed. If events are delayed, the query runtime will wait before evaluating the query, as it does not have a definitive view of all of the events. A query that uses source timestamps must specify the maximum wait time that a query will wait before it will process events. This is the maximum delay that the query will tolerate and the maximum delay between an event having occurred and the query processing that event. The wait time is inclusive - that is, an event delayed by exactly the value specified in the wait clause will be considered valid.
The maximum wait time must be specified and must be set to a reasonable value, as it can increase the number of events stored by the query runtime, and processing of the query may be delayed by up to that duration. The maximum wait time for an input may be less than or more than the within duration, but should not represent a large number of events for typical event rate for that input.
The wait time must be specified in a query using the wait clause in an input declaration. The wait clause can specify a time as a time literal (using days, hours, minutes and seconds) or as a float expression. Both the source timestamp action and wait clause must be specified (or neither). The source timestamp action can be specified via the time from clause in the query or a @TimeFrom annotation on the event type definition.
It is possible to mix inputs that have source times and events that do not have source times in a single query. Event inputs without a source time are equivalent to using currentTime (that is, the correlator's current time, see currentTime) as the source time, and a wait time of 0.
Event definitions may have an annotation defined @DefaultWait which specifies the default value to use for the wait time (see also Adding predefined annotations). This is only informational and used by the Design tab in Software AG Designer when editing query files as a means of setting the default wait time. The query must always specify the wait time, even if it is using the default value. Note that the editor will copy the value from the annotation, so changing the annotation will not affect existing query definitions.
Definitive time of a query event source
Given that input events may be delayed or out of order, how does the query runtime know when it is safe to process events? To answer this question, we introduce the concept of definitive time. The point in time for which the query runtime is entitled to think that it has received all the events it is going to receive is called the definitive time. All events at or before this point in time are considered definitive and can be used to evaluate the query. Events after the definitive time will not be processed until they become definitive (that is, the definitive time has changed so that the events are now at or before the definitive time). The query runtime will assume that no further events will be received with a time before the definitive time, and will only evaluate events that have occurred before the definitive time.
In the case of an individual query input, the definitive time of that input is the latest of:
*The timestamp of the latest event received (unless the event definition is marked as occurring out of order, see Out of order events).
*The timestamp of the latest heartbeat event, if specified (see Using heartbeat events with source timestamps).
*The correlator's current time less the maximum wait time of a query.
The query's overall definitive time is then determined as the minimum or earliest of the definitive times for each input.
If no events (either input or heartbeat events) are received, then a query may need to wait in order to evaluate the events it has received (particularly if using the wait operator in the pattern, or more than one input, where some inputs have no events received).
The concept of definitive time is best explained using worked examples. Consider, first, a query with a single input event type.
using com.apama.queries.TimeFrom;

@TimeFrom("getSourceTime")
event E {
integer k;
float sourceTime;

action getSourceTime() returns float {
return sourceTime;
}
}

query SingleInput {

inputs {
E() key k within 1 hour wait 2 hours;
}

find E as e1 -> E as e2 where e2.getSourceTime() - e1.getSourceTime() > 600.0
{
log "Time gap " + (e2.getSourceTime() - e1.getSourceTime()).toString();
}
}
In this case, where there is only a single input type, the definitive time will be the latest or most recent of either: the source timestamp of the last event, or the current time minus the wait time (2 hours in this example). The following table shows how the query runtime keeps track of the definitive time as it receives input events.
Wall Time
E event source timestamp
Query definitive time
Result
Explanation
10:00
07:00
08:00
10:05
07:30
08:05
Nothing - events are too old.
10:10
08:30
08:30
10:24
08:32
08:32
Nothing - event timestamps were only 2 minutes apart.
10:26
08:50
08:50
Time gap 18 minutes
10:30
10:30
10:30
Nothing - only 1 event in the "within 1 hour" window.
Now consider a more complex case where the query has two input event types. Events of type E are defined as above, but we add another definition for events of type X.
@TimeFrom("getSourceTime")
event X {
integer k;
float sourceTime;

action getSourceTime() returns float {
return sourceTime;
}
}

query MultipleInputs {

inputs {
E() key k within 1 hour wait 1 hour;
X() key k within 1 hour wait 1 hour;
}

find E as e1 -> E as e2 without X as x {
log "Got (" + e1.sourceTime.toString() + ", "
+ e2.sourceTime.toString() + ")";
}
}
Once again the table below shows how the definitive time of the query is determined. In this case, the runtime must take the definitive time as being the earliest of the definitive times of the input types because, as the pattern depends on all input types, it is only up until that point that it has a definitive view of all the query inputs.
For example, at wall time 09:22, even though the runtime has got E events with source timestamps 08:32 and 08:40, it is not entitled to conclude that we have a match for the query pattern because the most recent X event has a timestamp of only 08:25, so we do not yet know if there was an X event between 08:32 and 08:40 that would prevent a match. The wait time of 1 hour has not yet elapsed, so the definitive time of the query remains at 08:25, which is the source time of the most recent X event.
It is not until wall time 09:23 that we receive another X event with a source timestamp of 08:50. At this point, given that in this example we know that events are being delivered in order, it is safe for the runtime to assume that there were no other X events between 08:25 and 08:50 and so it can proceed to execute the query and match for the two pairs of E events ("08:30, 08:32" and "08:32, 08:40"). Further, at this time (wall time 09:23) the receipt of the X event with source timestamp 08:50 allows the runtime to update the definitive time of the overall query to 08:40, which has become the earliest of the definitive times of the query inputs.
Wall Time
E event source timestamp
X event source timestamp
Query definitive time
Result
Explanation
09:20
08:30
08:25
08:25
09:21
08:32
08:25
Nothing yet. Still waiting for an X.
09:22
08:40
08:25
09:23
08:50
08:40
Got (08:30, 08:32)
Got (08:32, 08:40)
09:24
08:55
08:50
No 08:40 - 08:55 match, there is an X at 08:50.
09:25
09:00
08:50
Nothing yet - still waiting for X after 08:50.
09:26
08:57
08:57
No 08:55 - 09:00 match, there is an X.
09:27
09:10
08:57
Nothing yet - still waiting for X after 08:57.
10:10
09:10
Got (09:00, 09:10)
We waited for 1 hour for an X.