Out of order events
When using source timestamps (see also
Using source timestamps of events), the query runtime by default expects events to arrive in order. If an event arrives with an earlier source timestamp than a previous event for that same partition, it will be discarded. However, there are two cases where this behavior does not occur (see below), and queries will store events which arrive out of order and re-order them so that when they are processed, they are processed in order according to the source time.
Note:
In both cases described below (with the @OutOfOrder() annotation and delayed events), heartbeat events (if specified) are always considered definitive, even if they are delayed. You cannot use an event definition with an @OutOfOrder() annotation as a heartbeat event. Note that as soon as a heartbeat event is processed, the query will ignore any events with earlier timestamps.
Case 1: Using the @OutOfOrder() annotation on the event definition
If the event definition (in an EPL file) has the
@OutOfOrder() annotation which is available in the package
com.apama.queries (see also
Adding predefined annotations), then the queries runtime will treat it as not occurring in order.
This means that definitive time is not affected by the timestamp on the events. Thus, events will not be processed until the specified wait time has elapsed since their source time, or a heartbeat event (if specified) with a later timestamp has been processed (and all inputs have had their definitive time moved forward).
It is recommended to use heartbeats when using @OutOfOrder() events. They are not required, but if not used, the query execution will be delayed by the longest input wait specified in the query.
The following example compares the behavior if @OutOfOrder() is or is not specified on the input:
query FindAdjacentAEvents {
inputs {
A() within 30.0 wait 20 seconds;
}
find A as a1 -> A as a2 {
print "a1 = "+a1.toString()+"; a2 = "+a2.toString();
}
}
In the following tables, the events are listed in the order in which they are processed, but they occur in the order A(1), A(2), A(3), A(4). Note that A(2) is delayed by more than the wait time of the query (the actual events would have a source timestamp, but we show that as a separate column for clarity).
The following table applies if the event definition does have @OutOfOrder():
Input event | Input event timestamp | Correlator time | Notes | Query definitive time | Query output |
A(1) | 10:00:10 | 10:00:20 | | 10:00:00 | |
A(4) | 10:00:20 | 10:00:30 | | 10:00:10 | |
A(3) | 10:00:15 | 10:00:32 | | 10:00:12 | |
| | 10:00:35 | 20 seconds after A(3)'s source time (10:00:15) | 10:00:15 | a1=A(1); a2=A(3) |
A(2) | 10:00:12 | 10:00:37 | discarded - more than 20 seconds old | 10:00:17 | |
| | 10:00:40 | 20 seconds after A(4)'s source time (10:00:20) | 10:00:20 | a1=A(3); a2=A(4) |
The following table applies if the event definition does not have @OutOfOrder():
Input event | Input event timestamp | Correlator time | Notes | Query definitive time | Query output |
A(1) | 10:00:10 | 10:00:20 | | 10:00:10 | |
A(4) | 10:00:20 | 10:00:30 | | 10:00:20 | a1=A(1); a2=A(4) |
A(3) | 10:00:15 | 10:00:32 | | 10:00:20 | (nothing - event is discarded as it is out of order) |
A(2) | 10:00:12 | 10:00:37 | discarded - more than 20 seconds old | 10:00:20 | |
Case 2: Events are delayed
Even in the case where events are normally delivered in order from the data source, if there is a delay which is then resolved, a number of delayed events may all be processed in a very short space of time. Even if they are delivered to Apama correlators in the correct order, the query runtime runs in parallel within the correlator, so events processed close together in time may be processed out of order, even if they do not have an @OutOfOrder() annotation on the event definition. If an event is delayed, then the query runtime will wait before considering the event's time as definitive for that input.
By default, the query runtime considers an event as delayed if its source time is more than 10 seconds before the correlator's time at the point it is processed, and it will wait for 10 seconds before considering the event's time as definitive for that input. These settings can be modified by sending in a SetDelayedEventsLeeway(delayLeeway, reorderBuffer) event:
com.apama.queries.SetDelayedEventsLeeway(5, 20.0)
The above example would set the query runtime to consider events older than 5 seconds as delayed, and would not consider them definitive until 20 seconds after they were received.
To consider all events in order regardless of delay, send an event with the first value set to infinity (as all actual delays must be less than infinity):
com.apama.queries.SetDelayedEventsLeeway(infinity, 0.0)
These events should be sent to all correlators in a cluster, typically as part of the initialization of the correlator along with injecting the query definitions.
The following example compares the behavior with different configurations and some delayed events:
query FindAdjacentAEvents {
inputs {
A() within 30 minutes wait 10 minutes;
}
find A as a1 -> A as a2 {
print "a1 = "+a1.toString()+"; a2 = "+a2.toString();
}
}
The following table lists the events where the A event does not have @OutOfOrder(). The last three columns give the behavior with different configurations:
Default config. A. Matches with the default values: 10 seconds delay threshold and 10 seconds reorder buffer.
Config. B. Matches if
SetDelayedEventsLeeway(300, 10) is sent: 5 minutes (300 seconds) delay threshold and 10 seconds reorder buffer.
Config. C. Matches if
SetDelayedEventsLeeway(10, 60) is sent: 10 seconds delay threshold and 1 minute reorder buffer.
Input event | Input event timestamp | Correlator time | Definitive time of the query for default leeway values | Default config. A | Config. B | Config. C |
A(1) | 10:06:10 | 10:10:30 | 10:00:30 (10 minutes ago) | | | |
A(4) | 10:06:20 | 10:10:31 | 10:00:31 (10 minutes ago) | | a1=A(1); a2=A(4) | |
A(3) | 10:06:15 | 10:10:32 | 10:00:32 (10 minutes ago) | | (A(3) out of order and discarded) | |
A(2) | 10:06:13 | 10:10:33 | 10:00:33 (10 minutes ago) | | (A(2) out of order and discarded) | |
| | 10:10:43 | 10:06:20 (latest A event received) | a1=A(1); a2=A(2) a1=A(2); a2=A(3) a1=A(3); a2=A(4) | | |
| | 10:11:33 | | | | a1=A(1); a2=A(2) a1=A(2); a2=A(3) a1=A(3); a2=A(4) |
A(6) | 10:12:05 | 10:12:10 | 10:12:05 (latest A event received) | a1=A(4); a2=A(6) | a1=A(4); a2=A(6) | a1=A(4); a2=A(6) |
A(5) | 10:12:04 | 10:12:11 | 10:12:05 (latest A event received) | (none - event A(5) is discarded) | (none - event A(5) is discarded) | (none - event A(5) is discarded) |
Note that A(6) is treated as occurring in order, as it is delayed by less than the delayLeeway value. Thus A(5) is discarded, as it has occurred out of order.