Apama Documentation : Connecting Apama Applications to External Components : Working with Connectivity Plug-ins : Using Connectivity Plug-ins : Using reliable transports
Using reliable transports
Reliable messaging gives you the tools to write event-processing applications that are resilient against message loss (for example, due to crashes or network outages).
To make use of reliable messaging in your connectivity plug-ins, you must:
*Configure the transports for reliable messaging. The nature of this configuration is transport-specific. Reliable messaging is only supported in chains that use the apama.eventMap host plug-in. See also Translating EPL events using the apama.eventMap host plug-in.
*Write EPL to send and receive events via these transports and to handle acknowledgments. For detailed information, see the event descriptions for Chain, AckRequired and FlushAck in the com.softwareag.connectivity and com.softwareag.connectivity.control packages in the API Reference for EPL (ApamaDoc) .
Note:  
Not all transports support reliable messaging.
Reliable-messaging-aware transports only support at-least-once delivery, which admits the possibility of duplicate messages, especially after recovery from downtime. Your applications should be written to handle this.
In this version, reliable messaging with connectivity plug-ins is controlled exclusively from EPL. At this time, reliable messaging cannot be automatically tied-in to correlator persistence.
If a license file cannot be found, reliable messaging with connectivity plug-ins is disabled. See Running Apama without a license file.
Message IDs
Messages going from the transport to the host contain unique message IDs. Each message ID is stored as sag.messageId in the metadata. See Metadata values. You only need access to the message IDs if you want to acknowledge individual events.
When a message becomes an EPL event (via the apama.eventMap host plug-in), its message ID will not automatically be available as a field on the event, as metadata fields are not mapped to event fields. If you require this behavior, you should configure the transport or a codec to perform this operation. If you cannot change transport behavior, you can use the copyFrom action of the Mapper codec to copy the sag.messageId metadata value into the payload field. It will then be available as a field on the event (assuming that event definition has the field defined on it). See also The Mapper codec connectivity plug-in.
Caution:  
If you are using codecs, it is important to copy the value from sag.messageId. Moving the value (and thus removing it from sag.messageId) will disrupt reliable receiving.
Chains
In general, when receiving or sending reliably, you need to know which connectivity chain is receiving (from transport to host) or sending (from host to transport) the events. To identify that connectivity chain, you use the EPL Chain event, which provides a wrapper with helpful actions pertaining to reliability. There are two actions on the ConnectivityPlugins event that can be used to get the Chain event for the chain you want:
*ConnectivityPlugins.getChainByChannel
This action looks up a chain instance by a channel it is subscribed to or sending to.
*ConnectivityPlugins.getChainById
This action looks up a chain instance by its identifier.
See the API Reference for EPL (ApamaDoc) for more information on these actions.
Reliable receiving
A transport can be configured for reliable receiving. This means the events are going from the outside world into the correlator, and you make sure that they are not lost in the case of a failure.
The EPL that receives the events is obliged to acknowledge when the events have been fully processed by the application. That is because the remote system to which a reliable transport connects typically keeps track of what messages have been acknowledged and what messages have not been acknowledged. In the event of a failure, any messages that have not been acknowledged are resent to you after reconnection/restart.
Keep in mind that "fully processed" is different from just receiving an event. It means that you have preserved the effect of that event, and done so safely enough that you will no longer need the event to be resent in the event of a failure. As an example, that might mean committing the contents of the event to a database, writing it to a file, or having sent an output event and received an acknowledgment for it.
There are often performance implications for an application that is late with acknowledgments. You should therefore acknowledge all events as soon as possible after receiving. There is no guarantee, however, that an acknowledgment will be processed immediately. For example, if you acknowledge some events in EPL and then the system goes down quite soon afterwards, the events may not have been fully acknowledged to the remote system and will therefore get redelivered.
Once the events have been fully processed by the EPL application, they can be acknowledged in either of the following ways:
*Listen for AckRequired events from the com.softwareag.connectivity.control package, and call the ackUpTo action on them. Doing it this way means you are acknowledging potentially large batches of events at a time.
*Use the ackUpTo action on the Chain event type to acknowledge all previously received events, up to and including a specific event of your choice. In this case, you identify the specific event with its message ID. But keep in mind that the message ID may not always be accessible to the EPL application.
The detailed technical reasons for choosing between the above mechanisms are given in the descriptions of the Chain and AckRequired events in the API Reference for EPL (ApamaDoc) .
The following EPL example shows how an application can write reliably-received events to a file. It uses a fictional plug-in named filePlugin for this purpose.
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.control.AckRequired;
...
 
monitor.subscribe("incomingEvents"); // The chain is sending us events
// on this channel
Chain chn := ConnectivityPlugins.getChainByChannel("incomingEvents",
Direction.TOWARDS_HOST); // Get the chain itself

on all MyEvent() as e { // The events the application is interested in
evtSequence.append(e);
}
 
on all AckRequired(chainId=chn.getId()) as ackRequired {
// Periodically acknowledge all previously received events,
// but only after safely writing their contents to a file
filePlugin.writeAndSync(evtSequence.toString());
evtSequence.clear();
ackRequired.ackUpTo();
}
In all reliable receiving, you have to consider the possibility that some events that have already been acknowledged might be resent to your application, especially when recovering after a failure. Your application should be written to either eliminate duplicates or tolerate them.
Any towards-host messages that get dropped by the chain due to errors (for example, when a codec cannot translate from one form to another due to an invalid format) are treated as if they have already been acknowledged.
Reliable sending
Reliable sending is symmetrical with reliable receiving for most purposes. With reliable sending, your EPL application can ask to be notified when a remote system has safely processed your events. As before, you have to know what chain is being used for reliable sending of these events, and so you have to get the relevant Chain instance. After sending some events, you may call the flush() action of the Chain.
Once all events sent to the chain before this call have been safely stored on (or have been processed by) the remote system, your application will see an acknowledgment in the form of a FlushAck event. Your application might then respond to this, for example, by acknowledging reliably-received events that caused these events to be sent, or by recording the fact that the event has been sent in a way that means that the application will not send it again. Your application should be written such that it in the event of a restart, it is able to resend any events that were sent but not acknowledged in its previous incarnation.
The following is an example of EPL application using the flush() action:
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.control.FlushAck;
...
 
// Get the chain for the channel we are sending events to
Chain chn := ConnectivityPlugins.getChainByChannel("chanWeSendTo",
Direction.TOWARDS_TRANSPORT);
 
on all wait(0.1) {
globalInteger := globalInteger + 1;
MyEvent e := MyEvent(globalInteger);
send e to "chanWeSendTo";
 
// All previously sent events have now been safely processed by
// the remote system
on FlushAck(requestId=chn.flush()) {
log "Fully sent " + e.toString() at INFO;
}
}
Your application should be written with the idea that it might send duplicate events in the case of a problem (for example, if your application sends out some events which are then processed by some remote system, but there is a crash before your application can see the FlushAck event).
Copyright © 2013-2017 Software AG, Darmstadt, Germany. (Innovation Release)

Product LogoContact Support   |   Community   |   Feedback