Sending messages reliably with application flushing notifications
Applications that use BEST_EFFORT reliability to send JMS messages can prevent message loss without using persistent monitors. To do this, each time an application sends a message to the JMS sender channel it also keeps the state required to re-generate the message. Periodically, the application requests the JMS sender to flush a batch of messages to the JMS broker. After all messages in this batch are sent to a JMS broker, the JMS sender sends a flush acknowledgement to the context that requested flushing. When the application receives the flush acknowledgement it executes an application-defined strategy for clearing state associated with the messages that have been sent to the JMS sender channel. This protects the application against failure of the correlator host.
Note:
Messages are still asynchronously sent to the JMS broker even when no flushing has been requested. Requesting a flush simply gives the application the ability to be notified when the messages have been handed off to the JMS broker.
The typical behavior of an application that sends messages reliably without using correlator persistence is as follows:
1. Continuously send messages to the JMS sender channel.
At the same time, the application must keep track of the messages that have been sent to the sender channel but not yet flushed to the JMS broker. These are referred to as outstanding messages.
Also, the applications must reliably keep whatever state is required to re-generate each message. It is important to ensure that the application would not lose data if the outstanding messages were lost due to failure of the correlator node. This is typically achieved by delaying acknowledgement of the incoming JMS messages, Apama events or database/MemoryStore transactions that are generating the sent messages.
2. Request JMS sender to flush outstanding messages.
Periodically, for example, for every 1000 outstanding messages, the application requests that the sender flush the outstanding messages to the JMS broker. This is accomplished by invoking the JMSSender.requestFlush() action. After sending the messages to the JMS broker, this action sends a JMSSenderFlushed acknowledgement event to the context that requested flushing.
The application should set up a listener for the JMSSenderFlushed event whose requestId field is equal to the requestId generated by the requestFlush() action. Also, this listener needs a reference to whatever state corresponds to this batch of outstanding messages. For example, this might be a transaction id.
You must determine how many messages to send before flushing the batch. Flushing each message is not advised as it would add a noticeable performance overhead. However, you do not want to flush messages so infrequently that excessive memory or buffer space is required to hold the state associated with the outstanding messages.
Be sure to implement any required mechanism for downstream receivers to deal with duplicate messages. Typically, an application does this by adding a unique id to each message.
3. Continue sending events to the JMS sender channel.
In many cases, it is fine to continue sending new events to the sender channel while waiting for acknowledgement that previous batches have been flushed. That is, it is okay to have multiple batches in flight to the JMS broker at any one time. This improves throughput but is more complicated to implement. Whether it is possible to have multiple flushes in flight simultaneously in your specific application depends on what the application needs to do when it receives a JMSSenderFlushed acknowledgement event.
4. Application receives a flush notification event.
When the JMS sender has finished processing all events in a batch that is being flushed to a JMS broker, it sends a JMSSenderFlushed event to the context that invoked the requestFlush() action. At this point, the messages are the responsibility of the JMS broker and they are safe from loss even if the correlator or other nodes fail.
The application should now remove any state associated with the messages in this batch. For example, the application can acknowledge the incoming messages that generated the messages sent to the JMS broker, or commit a database or MemoryStore transaction, or send an event that allows some other component to clear associated state from its buffers.
While this feature allows a well-designed application to prevent message loss in the case of a correlator failure, it cannot prevent message loss due to invalid mapping rules or non-existent JMS destinations. Such failures are recorded in the correlator log, but any messages associated with these failures are still included in the next flush acknowledgement, even though sending them to the JMS broker resulted in a failure. This behavior
Prevents failure of one message indefinitely blocking the sending of subsequent messages
Applies only to application bugs that would not benefit from retrying
If a recoverable failure occurs, such as loss of connection to the JMS broker, Apama keeps trying to send the messages until the connection is restored. While this might result in a long delay before the flush acknowledgement can be sent, no messages are lost. The flush acknowledgement is therefore an indication that the message batch has been fully processed by the correlator's JMS sender to the best of its ability. The flush notification is not a guarantee that every message in the batch was successfully delivered to the broker. For example, problems in the application or in the mapping configuration might have prevented successful delivery to the JMS broker.
Sending messages reliably without using correlator persistence is available only for senders that are using BEST_EFFORT reliability mode. Senders that are using AT_LEAST_ONCE or EXACTLY_ONCE reliability mode use the correlator's persistence feature and so have no need for manual send notifications.
A call to the requestFlush() action in a persistent monitor throws an exception. Allowing this call would cause the JMS acknowledgement state to be out of sync with the monitor state after recovery.
The code below provides an example of sending messages reliably with flushing acknowledgements.
using com.apama.correlator.jms.JMSSender;
using com.apama.correlator.jms.JMSConnection;
monitor FlushMessagesToJMSBroker {
. . .
// Each time the application sends an event to the JMS sender
// channel, increment the number of messages sent but not flushed.
send MyEvent() to jmsConnection.getSender("mySender").getChannel();
sendsSinceLastFlush := sendsSinceLastFlush + 1;
if sendsSinceLastFlush = 1000 {
// Stash state needed to re-send messages in case of correlator
// failure. After receiving a flush acknowledgement, this state can
// be cleared. In this example, keep a transaction id for a database.
integer transactionAssociatedWithFlushRequest := currentTransaction;
// Optionally, allow multiple flushes to be in flight concurrently.
currentTransaction := startNewTransaction();
// Request JMS sender to flush messages to the JMS broker.
// Listen for flush acknowledgement event and ensure that state
// that was saved can be cleared when the listener fires.
on JMSSenderFlushed(requestId =
jmsConnection.getSender("mySender").requestFlush()){
commitTransaction(transactionAssociatedWithFlushRequest);
}
}
}
When using sender flushing, an application can optionally set the JMS sender
messageDeliveryMode property to
PERSISTENT. This ensures that the messages are protected from loss by the JMS broker. See
jms:sender properties in
XML configuration bean reference.