Sample EPL
The sample EPL below describes how to subscribe and receive device measurements, device events and device information.
package com.apama.sample;
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
monitor CumulocityApplication {
action onload() {
fetchManagedObjects();
listenForMeasurements();
listenForAlarms();
listenForEvents();
listenForOperations();
}
action fetchManagedObjects() {
// Subscribe to receive all the devices from Cumulocity IoT
monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
// Consume all the devices from Cumulocity IoT
on all ManagedObject() as mo {
log mo.toString() at INFO;
// Update a managed object
/*
mo.params.add("CustomMetadata", {"metadata": "Adding custom data"});
send mo to ManagedObject.SEND_CHANNEL;
*/
}
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
// Fetch a list of all available devices
integer reqId := com.apama.cumulocity.Util.generateReqId();
on all FindManagedObjectResponse(reqId=reqId) as response
and not FindManagedObjectResponseAck(reqId=reqId) {
log "Received managedObject " + response.managedObject.toString() at INFO;
}
on FindManagedObjectResponseAck(reqId=reqId) {
log "Find Managed Objects request completed" at INFO;
monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
}
// Retrieve list of all available devices
send FindManagedObject(reqId, "", {"fragmentType": "c8y_IsDevice"})
to FindManagedObject.SEND_CHANNEL;
}
action listenForMeasurements() {
// Subscribe to receive all the measurements published from
// Cumulocity IoT
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
// Consume all the measurements from Cumulocity IoT
on all Measurement() as m {
log m.toString() at INFO;
}
// Create a new measurement
/*
Measurement m := new Measurement;
m.source := "<MANAGED_OBJECT_ID>";
m.time := currentTime;
m.type := "TemperatureMeasurement";
MeasurementValue mv := new MeasurementValue;
mv.value := 100.0;
dictionary<string, MeasurementValue> fragment :=
new dictionary<string, MeasurementValue>;
fragment.add("temperature", mv);
m.measurements.add("TemperatureMeasurement", fragment);
send m to Measurement.SEND_CHANNEL;
*/
}
action listenForEvents() {
// Subscribe to receive all the events published from
// Cumulocity IoT
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
// Consume all the events from Cumulocity IoT
on all Event() as e {
log e.toString() at INFO;
// Example for updating an event
/*
// update text
e.text := "This is an updated text";
send e to Event.SEND_CHANNEL;
*/
}
// Create a new event
/*
Event evt := new Event;
evt.source := "<MANAGED_OBJECT_ID>";
evt.type := "TestEvent";
evt.time := currentTime;
evt.text := "This is a sample event";
send evt to Event.SEND_CHANNEL;
*/
}
action listenForAlarms() {
// Subscribe to receive all the alarms published from
// Cumulocity IoT
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
// Consume all the alarms from Cumulocity IoT
on all Alarm() as alarm {
log alarm.toString() at INFO;
// Example for updating an alarm
/*
// set alarm severity to MAJOR
alarm.severity := "MAJOR";
send alarm to Alarm.SEND_CHANNEL;
*/
}
// Create a new alarm
/*
Alarm alarm := new Alarm;
alarm.source := "<MANAGED_OBJECT_ID>";
alarm.type := "TestAlarm";
alarm.severity := "MINOR";
alarm.status := "ACTIVE";
alarm.time := currentTime;
alarm.text := "This is a sample alarm";
send alarm to Alarm.SEND_CHANNEL;
*/
}
action listenForOperations() {
// Subscribe to receive all the operations published from
// Cumulocity IoT
// Note: When using the Cumulocity transport, ensure that the
// subscribeToOperations transport property is set to true
monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
on all Operation() as o {
log o.toString() at INFO;
// Update an operation
/*
o.status := "EXECUTING";
send o to Operation.SEND_CHANNEL;
*/
}
// Create an operation
/*
Operation operation := new Operation;
operation.source := "<MANAGED_OBJECT_ID>";
operation.status := "PENDING";
operation.params.add("c8y_Message", {"text": "Device Operation"});
send operation to Operation.SEND_CHANNEL;
*/
}
}