Apama 10.7.2 | Connecting Apama Applications to External Components | Standard Connectivity Plug-ins | The Digital Event Services Transport Connectivity Plug-in | Reliable messaging with Digital Event Services
 
Reliable messaging with Digital Event Services
Digital Event Services offers reliability with only a couple of small requirements:
*Reliable sending. The delivery mode of the Digital Event Services event type must be persistent. This allows you to perform flush operations for that event type.
*Reliable receiving. The delivery mode of the Digital Event Services event type must be persistent. In addition, a subscriberId must be set in the configuration file. This requires that you perform acknowledgments for events of that type.
For detailed information on how to configure the delivery mode, see Using Digital Event Services to Communicate between Software AG Products.
See the properties file DigitalEventServices.properties for information on the subscriberId and other configuration options. For more detailed information on using reliable messaging in general, see Using reliable transports.
Shared durable subscribers
The Digital Event Services transport makes use of shared durable subscribers for reliable receiving. When a single correlator is connected with a particular subscriberId, the correlator will receive and acknowledge events. Events are resent after a failure once the failed component has been restarted/reconnected. With multiple correlators sharing the same subscriberId, events are delivered in a round-robin fashion to each available receiver.
If all subscribed monitor instances explicitly unsubscribe from a type, or if those monitor instances terminate, then that does not count as a failure. Any events of this type that are sent afterwards will not be received, and will not be resent upon resubscription.
CAUTION:
In a system with multiple correlators sharing the same subscriberId, an explicit unsubscribe from one correlator will unsubscribe the other correlators from that type.
Reliable receiving
The CHANNEL constant on the auto-generated EPL type allows you to find the connectivity chain (Chain) used for receiving events of this type, so that the Chain can be used for reliable messaging operations.
Example:
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.control.AckRequired;
using a.pkg.myEvent; // our auto-generated EPL type from DES
 
monitor receiver
{
action onload()
{
monitor.subscribe(myEvent.CHANNEL);
Chain c := ConnectivityPlugins.getChainByChannel(myEvent.CHANNEL,
Direction.TOWARDS_HOST);
on all myEvent() as ev
{
// process ev
}
on all AckRequired(chainId=c.getId()) as ar
{
// make sure all events before the AckRequired have been fully processed
...
// and only acknowledge them once that is done
ar.ackUpTo();
}
}
}
If you need the message identifier of an event for doing per-event acknowledgment (Chain.ackUpTo), then it will be available as a field on the event. The specific field will be called out in the MessageId annotation of the auto-generated EPL representation of the Digital Event Services type.
Reliable sending
Again, the CHANNEL constant allows you to find the connectivity chain (Chain) used for sending events of this type, so that the Chain can be used for reliable messaging operations.
Important:
When using reliable sending, the Digital Event Services storage location must be in a safe place, as events are acknowledged after they have been persisted to disk, but before they are sent to the remote system. This does not apply if the store-and-forward queue has been disabled, in which case events are acknowledged only once they have been committed to the remote system.
Example:
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.control.FlushAck;
using a.pkg.myEvent; // our auto-generated EPL type from DES
 
monitor sender
{
action onload()
{
Chain c := ConnectivityPlugins.getChainByChannel(myEvent.CHANNEL,
Direction.TOWARDS_TRANSPORT);
on all wait (0.1)
{
send myEvent("hello") to myEvent.CHANNEL;
// flush after each send and listen for the acknowledgment
on FlushAck(requestId = c.flush()) as fa
{
// event acknowledged, we no longer need to hold on to it
}
}
}
}