Retaining the most recent item in each partition of a partitioned stream
There are some situations where you want to join the most recent events from two sources, based on a common key. Typically you are processing all events from those sources and not a subset of those events. This pattern is similar to the previous example, but with a partition by clause added to each leg of the join.
01. event Temperature { string sensorId; float temperature; }
02. event Pressure { string sensorId; float pressure; }
03. event TemperatureAndPressure { string sensorId; float temperature; float pressure; }
04. monitor CombineTheLatestTemperatureAndPressureReadings {
05. action onload() {
06. TemperatureAndPressure tp;
07. from t in all Temperature() partition by t.sensorId retain 1
08. join p in all Pressure() partition by p.sensorId retain 1
09. on t.sensorId equals p.sensorId
10. select TemperatureAndPressure(t.sensorId, t.temperature, p.pressure) : tp {
11. print tp.toString();
12. }
13. }
14. }