Apama 10.15.0 | Connecting Apama Applications to External Components | Standard Connectivity Plug-ins | The Cumulocity IoT Transport Connectivity Plug-in | Working with multi-tenant deployments
 
Working with multi-tenant deployments
When developing an application which can work in multi-tenant deployments, the application needs to handle subscription/unsubscription of tenants and sending/receiving events to/from subscribed tenants.
The application can use the com.apama.cumulocity.GetAllTenantsSubscribedToApplication event to get all tenants that are currently subscribed to the current application and listen for the com.apama.cumulocity.ApplicationSubscribedForTenant and the com.apama.cumulocity.ApplicationUnsubscribedForTenant events to get future tenant subscription/unsubscription notifications.
Alternatively (the preferred approach), the application can use the com.apama.cumulocity.TenantSubscriptionNotifier event to get callback notifications about the currently subscribed tenants as well as notifications about any future tenant subscriptions and unsubscriptions. When using the TenantSubscriptionNotifier event, a new monitor instance is spawned for each tenant subscription in a private context for that tenant, and the callback action is called from within the new monitor instance. This keeps the state and processing for each tenant separate from other tenants. When the tenant is unsubscribed, the callback action, if registered, is called within the same monitor instance. The monitor instance is terminated after the callback action is processed.
The subscription callback provides a com.apama.cumulocity.TenantDetails event which is used to create connection objects for that tenant. The connection objects provide getChannel actions to get tenant-specific channels to send events to and receive events from for that tenant. See the documentation of the FullyConcurrentConnection, SerialConnection, AutoConcurrentConnection, and SharedConnection events in the com.apama.cumulocity package for more details.
The application developed to handle multi-tenant deployments using the above-mentioned events works in per-tenant deployments as well.
The example below shows how to implement a simple threshold rule which raises an alarm when the value of the measurement is greater than the threshold.
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.Alarm;

/**
* This application monitors measurements and raises alarms if value is greater
* than the threshold.
*
*/
monitor ThresholdAlarmRule {
/** The measurement threshold value. */
constant float THRESHOLD := 10.0;

action onload() {

// Listen for measurements and raise an alarm if measurement exceeds a threshold.
monitorMeasurements();
}

/** Listen for measurements and raise an alarm if measurement exceeds a threshold. */
action monitorMeasurements() {

monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);

// Listen for all MeasurementFragment of interested types.
on all MeasurementFragment(type="my_measurement", valueFragment="my_frag", valueSeries="my_series") as m {
// Raise alarm if value is greater than the threshold.
if m.value > THRESHOLD {
Alarm alarm := Alarm("","my_alarm",m.source,currentTime,"Threshold exceeded",
"ACTIVE","MINOR",1,new dictionary<string,any>);
// Get tenant-specific channel to which alarm must be sent.
send alarm to Alarm.SEND_CHANNEL;
}
}
}
}
The example below shows how to convert an existing per-tenant EPL application to also work in multi-tenant mode
using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;
using com.apama.cumulocity.SharedConnection;

using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.Alarm;

/**
* This application monitors measurements and raises alarms if value is greater
* than the threshold.
*
* The application monitors tenants subscriptions and unsubscriptions. It starts
* listening for measurements when a tenant is subscribed and stops it when tenant
* is unsubscribed.
*
* The application works correctly both in per-tenant and multi-tenant deployment.
*/
monitor ThresholdAlarmRule {
/** The measurement threshold value. */
constant float THRESHOLD := 10.0;

/** Connection object for the tenant. It is used to get correct channels for sending and receiving events.*/
SharedConnection connection;

action onload() {
// Create a new instance of TenantSubscriptionNotifier and register callbacks for
// subscriptions and unsubscriptions
TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().
onSubscription(tenantSubscribed).
onUnsubscription(tenantUnsubscribed);
}

/** Action which is called back when a tenant is subscribed. */
action tenantSubscribed(TenantDetails tenant) {
// Create a connection object for this tenant so that we can send events to
// this tenant and receive events from it.
// For most cases, SharedConnection connection is sufficient, but also see
// FullyConcurrentConnection, SerialConnection and AutoConcurrentConnection events
// to create connections with different semantics.
connection := SharedConnection.createForTenant(tenant);

// Now listen for measurements and raise an alarm if measurement exceeds a threshold.
monitorMeasurements();
}

/** Listen for measurements and raise an alarm if measurement exceeds a threshold. */
action monitorMeasurements() {
monitor.subscribe(connection.getChannel(MeasurementFragment.SUBSCRIBE_CHANNEL));

// Listen for all MeasurementFragment of interested types.
on all MeasurementFragment(type="my_measurement", valueFragment="my_frag", valueSeries="my_series") as m {
// Raise alarm if value is greater than the threshold.
if m.value > THRESHOLD {
Alarm alarm := Alarm("","my_alarm",m.source,currentTime,"Threshold exceeded",
"ACTIVE","MINOR",1,new dictionary<string,any>);
// Get tenant-specific channel to which alarm must be sent.
send alarm to connection.getChannel(Alarm.SEND_CHANNEL);
}
}
}

/** Action which is called back when a tenant is unsubscribed. */
action tenantUnsubscribed(string tenantId) {
// Destroy the connection object.
connection.destroy();
// Do not need to explicitly terminate this monitor instance as it
// is automatically done.
}

action ondie {
// Destroy the connection object.
connection.destroy();
}
}