Apama 10.15.0 | Connecting Apama Applications to External Components | Correlator-Integrated Support for the Java Message Service (JMS) | Using the Java Message Service (JMS) | Designing and implementing applications for correlator-integrated messaging for JMS | Duplicate detection when using JMS
 
Duplicate detection when using JMS
Apama provides an EXACTLY_ONCE receiver reliability setting that allows a finite number of duplicate messages to be detected and dropped before they get to the correlator. This setting can be used to reduce the chance of duplicates; however with JMS, duplicate detection is a complex process. Therefore, customers are strongly encouraged to architect their applications to be tolerant of duplicate messages and use the simpler AT_LEAST_ONCE reliability mode instead of EXACTLY_ONCE when possible.
Configuring duplicate detection is an inexact science given that it depends considerably on the behavior of the sender(s) for a queue, and requires careful architecture and sizing to ensure robust operation in normal use and expected error cases. Moreover it is not possible to guarantee duplicate messages will never be seen without an infinite buffer of duplicates. Give particular attention to architectures where multiple sender processes are writing to the same queue, especially if it is possible that one sender may send a duplicate message it has taken off another failed sender that has not recorded the fact that it is already processed and sent out a given message.
Duplicate detection is a trade-off between probability of an old duplicate not being recognized as such, and the amount of memory and disk required, which will also have an impact on latency and throughput.
Selecting the right value for the dupDetectionExpiryTimeSecs is a very important aspect of ensuring that the duplicate detection process will operate reliably — detecting duplicates where necessary without running out of memory when something goes wrong. The expiry time used for the duplicate detector should take into account how the JMS provider will deal with several consecutive process or connection failures on the receive side, especially if the JMS provider temporarily holds back messages for failed connections in an attempt to work around temporary network problems. Be sure to consult the documentation for the JMS provider being used to understand how it handles connection failures. It is a good idea to conduct tests to see what happens when the connection between the JMS broker and the correlator goes down. When testing, consider using the "rMaxDeliverySecs=" value from the "JMS Status:" line in the correlator log to help understand the minimum expiry time needed to catch redelivered duplicates. Note, however, this is only useful if the JMS provider reliably sets the JMSRedelivered flag when performing a redelivery. A good rule of thumb is to use an expiry time of two to three times the broker's redelivery timeout.
Note that although space within the reliable receive (duplicate detection) datastore is reclaimed and reused when older duplicates expire, the file size will not be reduced. There is currently no mechanism for reducing the amount of disk space used by the database, so the on-disk size may grow, bounded by the peak duplicate detector size, but will not shrink.
Messages that are subject to duplicate detection contain:
*uniqueMessageId - an application-level identifier which functions as the key for determining whether a message is functionally equivalent (or identical) to a message already processed, and should therefore be ignored.
*messageSourceId - an optional application-specific string which acts as a key to uniquely identify upstream message senders. This could be a standard GUID (globally unique identifier) string. If provided, the messageSourceId is used to control the expiry of uniqueMessageIds from the duplicate detection cache, allowing dupDetectionPerSourceExpiryWindowSize messages to be kept per messageSourceId. This massively improves the reliability of the duplicate detection while keeping the window size relatively small, since if one sender fails then recovers several hours later, there is no danger of another (non-failed) sender filling up the duplicate detection cache in the meantime and expiring the ids of the first sender causing its duplicates to go undetected.
The key configuration options for duplicate detection are:
*dupDetectionPerSourceExpiryWindowSize - The number of messages that will be kept in each duplicate detection domain per messageSourceId (if messageSourceId is set on each message by the upstream system - messages without a messageSourceId will all be grouped together into one window for the entire dupDetectionDomainId). This property is specified on the global JmsReceiverSettings bean. It is usually configured based on the characteristics of the upstream JMS sender, and the maximum number of in-doubt messages that it might resend in the case of a failure. The default value in this release is 2000. It can be set to 0 to disable the fixed-size per-sender expiry window.
*dupDetectionExpiryTimeSecs - The time for which uniqueMessageIds will be remembered before they expire. This property is specified on the global JmsReceiverSettings bean. The default value in this release is 2 minutes. It can be set to 0 to disable the time-based expiry window (which makes it easier to have a fixed bound on the database size, though this is not an option if the JMS provider itself causes duplicates by redelivering messages after a timeout due to network problems).
*dupDetectionDomainId - An application-specific string which acts as a key to group together receivers that form a duplication detection domain, for example, a set of receivers that must be able to drop duplicate messages with the same uniqueMessageId (which may be from one, or multiple upstream senders). This property is specified on the jms:receiver bean. By default, the duplicate detection domain is always the same as the JMS destination name and connectionId, so cross-receiver duplicate detection would happen only if multiple receivers in the same connection are concurrently listening to the same queue; duplicates would not be detected if sent to a different queue name, or if sent to the same queue name on a different connection, or if JNDI is used to configure the receiver but the underlying JMS name referenced by the JNDI name changes. Also note that if the message streams processed by each receiver were being partitioned using message selector, unnecessary duplicate detection would be performed in this case. The duplicate detection domain name can be specified on a per-receiver basis to increase, reduce or change the set of receivers across which duplicate detection will be performed. Common values are:
*dupDetectionDomainId=connectionId+":"+jmsDestinationName - the default for queues.
*dupDetectionDomainId=jmsDestinationName - if using receivers to access the same queue from multiple separate connections.
*dupDetectionDomainId=jndiDestinationName - if using JNDI to configure receiver names, and needing the ability to change the queue or topic that the JNDI name points to.
*dupDetectionDomainId=connectionId+":"+receiverId - the default for topics; also used if each receiver should check for duplicates independently of other receivers. This is useful if receivers are already using message selectors to partition the message stream, which implies that cross-receiver duplicates are not possible.
*dupDetectionDomainId=<application-defined-name> - if using multiple receivers per selector-partitioned message stream. The name is likely to be related to the message selector expression.
Duplicate detection only works if the upstream JMS sender has specified a uniqueMessageId for each message (the uniqueMessageId is typically as a message property, but could alternatively be embedded within the message body if the mapper is configured to extract it). Any messages that do not have this identifier will not be subject to duplicate detection. The uniqueMessageId string is expected to be unique across all messages within the configured dupDetectionDomainId (for example, queue), including messages with different messageSourceIds. By default, sent JMS messages would have a uniqueMessageId of seqNo:messageSourceId, where seqNo is a contiguous sequence number that is unique for the sender, for example:
uniqueMessageId=1:mymachinename1.domain:1234:567890:S01
uniqueMessageId=2:mymachinename1.domain:1234:567890:S01
uniqueMessageId=3:mymachinename1.domain:1234:567890:S01
uniqueMessageId=1:mymachinename2.domain:4321:987654:S01
uniqueMessageId=2:mymachinename2.domain:4321:987654:S01
uniqueMessageId=1:mymachinename2.domain:4321:987654:S02
uniqueMessageId=2:mymachinename2.domain:4321:987654:S02
...
To reliably perform duplicate detection if there are multiple senders writing to the same queue (without the Apama receiver having to configure a very large and therefore costly time window to prevent premature expiry of ids from a sender that has failed and produces no messages for a while then recovers, possibly sending duplicates as it does so), the upstream senders should be configured to send with a globally-unique messageSourceId identifying the message source/sender, which should also be configured in the mapping layer of the receiver.
Apama's duplicate detection involves a set of fixed-size per-sourceId queues, and when the queue is full the oldest items are expired to a shared queue ordered by timestamp (time received by the correlator's JMS receiver) whose items are expired based on a time window. So the receiver settings controlling duplicate detection window sizes are:
*dupDetectionPerSourceExpiryWindowSize
*dupDetectionExpiryTimeSecs
uniqueMessageIds are expired from the per-source queue (and moved to the time-based queue) when it is full of newer ids, or when a newer message with the same uniqueMessageId already in the queue for that source is received.
uniqueMessageIds are expired from the time-based queue (and removed from the database permanently) when they are older than the newest item in the time-based queue by more than dupDetectionExpiryTimeSecs.