Apama 10.7.2 | Connecting Apama Applications to External Components | Working with Connectivity Plug-ins | Developing Connectivity Plug-ins | Developing reliable transports
 
Developing reliable transports
This section explains how to develop transports that support reliable messaging. For information on how to use them, see Using reliable transports.
Reliable messaging uses control messages, which are special messages that are sent between the host and the transport. They are used to signal actions that the host or transport should take as well as the acknowledgments from these actions. The control messages have null (Java) or empty (C++) payloads, and instead store all their information in the metadata.
The type of a control message is stored in a metadata field that can be accessed with the CONTROL_TYPE constant of the com.softwareag.connectivity.Message (Java) or com::softwareag::connectivity::Message (C++) class. The value of this field should be one of the type names listed below. These names are also accessed by constants. For more information, see the Message class in the API Reference for Java (Javadoc) or API Reference for C++ (Doxygen).
Type
Constant
Description
AckRequired
CONTROL_TYPE_ACK_REQUIRED
This control message is sent from the transport to the host. It is used to ask the host to acknowledge all events that have been sent towards the host before this AckRequired.
AckUpTo
CONTROL_TYPE_ACK_UPTO
This control message is sent from the host to the transport, and it is the acknowledgment for the AckRequired control message. It is used to inform the transport that a particular AckRequired request has been fulfilled.
Flush
CONTROL_TYPE_FLUSH
This control message is sent from the host to the transport. It is used to ask the transport to acknowledge all events that have been sent towards the transport before this Flush.
FlushAck
CONTROL_TYPE_FLUSH_ACK
This control message is sent from the transport to the host, and it is the acknowledgment for the Flush control message. It is used to inform the host that a particular Flush request has been fulfilled.
The control message metadata also contains fields that can be accessed with the following constants:
*MESSAGE_ID
This constant names a metadata field used for uniquely identifying non-control messages (that is, real events with payloads) that are being sent towards the host. This constant also names a metadata field on the AckRequired and AckUpTo control messages that are used for reliable receiving. In AckRequired, it contains the message identifier of the immediately preceding non-control message. In AckUpTo, it contains the message identifier of the AckRequired that is being responded to.
*REQUEST_ID
This constant names a metadata field on the Flush and FlushAck messages that are used for reliable sending. The field denotes a unique identifier for matching up a Flush with its corresponding FlushAck.
Transports receive and send the above-mentioned control messages. The exact logic of how they should be processed depends on the exact nature of the external system that the transport connects to. More information and examples are provided below.
Note:
The Java examples below are not intended to be used as a starting point. They only illustrate the core concept of handling control messages.
Writing a transport for reliable receiving
This section describes the obligations of a transport that wishes to see acknowledgments of messages that it is sending towards the host, in order that it can pass those acknowledgments to the reliable messaging system that it is connected to. Such a transport must declare its reliability before any messaging can take place, before the plug-in is fully started. This is achieved by calling the enableReliability function on the PluginHost member of the transport, either from the constructor or start() method.
public MyReliableTransport(Logger logger, TransportConstructorParameters params)
throws IllegalArgumentException, Exception
{
super(logger, params);
host.enableReliability(Direction.TOWARDS_HOST);
}
A transport must place unique identifiers on any non-control messages (that is, real events) that it is sending towards the host. Ideally, these correspond to identifiers provided by the remote messaging system that your code is receiving from. While not 100% necessary, it makes tracing a message through the wider system much easier.
MyExternalMessage externalMessage = fictionalRemoteSystem.get();
Message msg = transformToMessage(externalMessage);
msg.putMetadataValue(Message.MESSAGE_ID,
externalMessage.getUniqueIdentifier());
hostside.sendBatchTowardsHost(Collections.singletonList(msg));
A transport must decide how regularly it wishes to receive acknowledgments (AckUpTo) from the host application, by deciding when it sends AckRequired control messages towards the host. In general, you should attempt to space these messages as widely as possible, so as not to put too much burden on the EPL application. The steps taken to commit the effects of received events may be quite expensive. However, the frequency of acknowledgments will probably also be constrained by the nature of the remote messaging system your transport is connected to. For example, it may only permit 1,000 unacknowledged messages to be outstanding before blocking receipt of further messages. In this case, you will want to be sending out AckRequired control messages after every n real messages where n is a large fraction of 1,000.
Time is another factor to consider. In the worst case, for example, if acknowledgments are too sparse, a reconnecting application may face 10 minutes of redelivered messages that did not get acknowledged in a previous session. So in general, a transport should make sure to issue AckRequired control messages at least every few seconds, assuming that any non-control messages have been sent towards the host since the last AckRequired.
An AckRequired control message must also contain the message identifier of the preceding non-control message, in order to identify which tranche of previous messages is covered by a corresponding acknowledgment.
Message ackRequired = new Message(null);
ackRequired.putMetadataValue(Message.CONTROL_TYPE,
Message.CONTROL_TYPE_ACK_REQUIRED);
ackRequired.putMetadataValue(Message.MESSAGE_ID, lastId);
Finally, a transport should be prepared to act on acknowledgments received from the EPL application, that is, AckUpTo control messages from the host. Each AckUpTo corresponds exactly to a previously issued AckRequired, with both containing the same MESSAGE_ID. AckUpTo messages are seen in the exact same order that the AckRequired messages were issued.
public void deliverNullPayloadTowardsTransport(Message message)
throws Exception {
Map<String, Object> metadata = message.getMetadataMap()
if (metadata.containsKey(Message.CONTROL_TYPE))
{
String controlType = (String)metadata.get(Message.CONTROL_TYPE);
if(Message.CONTROL_TYPE_ACK_UPTO.equals(controlType))
{
String messageId = metadata.get(Message.MESSAGE_ID);
fictionalRemoteSystem.ackUpToAndIncluding(messageId);
}
}
}
Writing a transport for reliable sending
This section describes the obligations of a transport that wishes to reliably acknowledge messages that are being sent to it from an EPL application, that is, from the host. As before, the transport should declare its reliable nature and direction.
public MyReliableTransport(Logger logger, TransportConstructorParameters params)
throws IllegalArgumentException, Exception
{
super(logger, params);
host.enableReliability(Direction.TOWARDS_TRANSPORT);
}
The transport should be prepared to act on Flush control messages, ensuring that all preceding non-control messages are reliably delivered to a remote reliable messaging system. Once done, the transport should respond with a FlushAck control message towards the host, with a REQUEST_ID set to match it with the corresponding Flush.
Frequent Flush messages are automatically coalesced into individual messages that are more widely spaced. So a transport need not be concerned with the performance impact of responding to every Flush request. Also, Flush messages are subsumed by subsequent Flush messages and their acknowledgments. For example, if a transport receives three Flush messages, a FlushAck corresponding to the final Flush is interpreted as being a response to all three.
@Override
public void deliverNullPayloadTowardsTransport(Message message)
throws Exception {
Map<String, Object> metadata = message.getMetadataMap();
if (metadata.containsKey(Message.CONTROL_TYPE))
{
String controlType = (String)metadata.get(Message.CONTROL_TYPE);
if(Message.CONTROL_TYPE_FLUSH.equals(controlType))
{
fictionalRemoteSystem.commitEverythingSoFar();
Message response = new Message(null);
response.putMetadataValue(Message.CONTROL_TYPE,
Message.CONTROL_TYPE_FLUSH_ACK);
response.putMetadataValue(Message.REQUEST_ID,
Long.parseLong(metadata.get(Message.REQUEST));
hostSide.sendBatchTowardsHost(Collections.singletonList(response));
}
}
}