Using reliable transports
Reliable messaging gives you the tools to write event-processing applications that are resilient against message loss (for example, due to crashes or network outages).
To make use of reliable messaging in your connectivity plug-ins, you must:
Write EPL to send and receive events via these transports and to handle acknowledgments. For detailed information, see the event descriptions for
Chain,
AckRequired and
FlushAck in the
com.softwareag.connectivity and
com.softwareag.connectivity.control packages in the
API Reference for EPL (ApamaDoc) .
Note:
Not all transports support reliable messaging.
Reliable-messaging-aware transports only support at-least-once delivery, which admits the possibility of duplicate messages, especially after recovery from downtime. Your applications should be written to handle this.
In this version, reliable messaging with connectivity plug-ins is controlled exclusively from EPL. At this time, reliable messaging cannot be automatically tied-in to correlator persistence.
If a license file cannot be found, reliable messaging with connectivity plug-ins is disabled. See
Running Apama without a license file.
This transport connectivity plug-in | supports reliable messaging |
Universal Messaging | No |
MQTT | No |
Digital Event Services | Yes |
HTTP server | No |
HTTP client | No |
Kafka | No |
Cumulocity IoT | No |
Message identifiers
Messages going from the transport to the host contain unique message identifiers. Each identifier is stored as
sag.messageId in the metadata. See
Metadata values. You only need access to the message identifiers if you want to acknowledge individual events.
Where a message maps to an event type that has the
MessageId annotation, the message identifier in the metadata is copied into a field on that event. You should not name a field that you expect to have a real value. See
Adding predefined annotations.
The following EPL example shows how to use the MessageId annotation:
using com.softwareag.connectivity.MessageId;
@MessageId("messageIdentifier")
event MyEvent {
string s;
integer i;
string messageIdentifier; // Contains the sag.messageId from the
// message that mapped to this event
}
Chains
In general, when receiving or sending reliably, you need to know which connectivity chain is receiving (from transport to host) or sending (from host to transport) the events. To identify that connectivity chain, you use the EPL Chain event, which provides a wrapper with helpful actions pertaining to reliability. There are two actions on the ConnectivityPlugins event that can be used to get the Chain event for the chain you want:
ConnectivityPlugins.getChainByChannel This action looks up a chain instance by a channel it is subscribed to or sending to.
ConnectivityPlugins.getChainById This action looks up a chain instance by its identifier.
See the
API Reference for EPL (ApamaDoc) for more information on these actions.
Reliable receiving
A transport can be configured for reliable receiving. This means the events are going from the outside world into the correlator, and you make sure that they are not lost in the case of a failure.
The EPL that receives the events is obliged to acknowledge when the events have been fully processed by the application. That is because the remote system to which a reliable transport connects typically keeps track of what messages have been acknowledged and what messages have not been acknowledged. In the event of a failure, any messages that have not been acknowledged are resent to you after reconnection/restart.
Keep in mind that fully processed is different from just receiving an event. It means that you have preserved the effect of that event, and done so safely enough that you will no longer need the event to be resent in the event of a failure. As an example, that might mean committing the contents of the event to a database, writing it to a file, or having sent an output event and received an acknowledgment for it.
There are often performance implications for an application that is late with acknowledgments. You should therefore acknowledge all events as soon as possible after receiving. There is no guarantee, however, that an acknowledgment will be processed immediately. For example, if you acknowledge some events in EPL and then the system goes down quite soon afterwards, the events may not have been fully acknowledged to the remote system and will therefore get redelivered.
Once the events have been fully processed by the EPL application, they can be acknowledged in either of the following ways:
Listen for
AckRequired events from the
com.softwareag.connectivity.control package, and call the
ackUpTo action on them. Doing it this way means you are acknowledging potentially large batches of events at a time.
Use the
ackUpTo action on the
Chain event type to acknowledge all previously received events, up to and including a specific event of your choice. In this case, you identify the specific event with its message identifier. If the event definition has the
MessageId annotation, you can obtain the message identifier from the named field.
The detailed technical reasons for choosing between the above mechanisms are given in the descriptions of the
Chain and
AckRequired events in the
API Reference for EPL (ApamaDoc) .
The following EPL example shows how an application can write reliably-received events to a file. It uses a fictional plug-in named filePlugin for this purpose.
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.control.AckRequired;
...
monitor.subscribe("incomingEvents"); // The chain is sending us events
// on this channel
Chain chn := ConnectivityPlugins.getChainByChannel("incomingEvents",
Direction.TOWARDS_HOST); // Get the chain itself
on all MyEvent() as e { // The events the application is interested in
evtSequence.append(e);
}
on all AckRequired(chainId=chn.getId()) as ackRequired {
// Periodically acknowledge all previously received events,
// but only after safely writing their contents to a file
filePlugin.writeAndSync(evtSequence.toString());
evtSequence.clear();
ackRequired.ackUpTo();
}
In all reliable receiving, you have to consider the possibility that some events that have already been acknowledged might be resent to your application, especially when recovering after a failure. Your application should be written to either eliminate duplicates or tolerate them.
Any towards-host messages that get dropped by the chain due to errors (for example, when a codec cannot translate from one form to another due to an invalid format) are treated as if they have already been acknowledged.
Reliable sending
Reliable sending is symmetrical with reliable receiving for most purposes. With reliable sending, your EPL application can ask to be notified when a remote system has safely processed your events. As before, you have to know what chain is being used for reliable sending of these events, and so you have to get the relevant Chain instance. After sending some events, you may call the flush() action of the Chain.
Once all events sent to the chain before this call have been safely stored on (or have been processed by) the remote system, your application will see an acknowledgment in the form of a FlushAck event. Your application might then respond to this, for example, by acknowledging reliably-received events that caused these events to be sent, or by recording the fact that the event has been sent in a way that means that the application will not send it again. In the event of a restart, your application should be written such that it is able to resend any events that were sent but not acknowledged in its previous incarnation.
The following is an example of EPL application using the flush() action:
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.control.FlushAck;
...
// Get the chain for the channel we are sending events to
Chain chn := ConnectivityPlugins.getChainByChannel("chanWeSendTo",
Direction.TOWARDS_TRANSPORT);
on all wait(0.1) {
globalInteger := globalInteger + 1;
MyEvent e := MyEvent(globalInteger);
send e to "chanWeSendTo";
// All previously sent events have now been safely processed by
// the remote system
on FlushAck(requestId=chn.flush()) {
log "Fully sent " + e.toString() at INFO;
}
}
Your application should be written with the idea that it might send duplicate events in the case of a problem (for example, if your application sends out some events which are then processed by some remote system, but there is a crash before your application can see the FlushAck event).