DataGroup Conflation Attributes
Enabling Conflation on DataGroups
Universal Messaging DataGroups can be configured so that conflation (merging and throttling of events) occurs when messages are published. Conflation can be carried out in several ways and these are specified using a nConflationAttributes object. The ConflationAttributes object is passed in to the DataGroup when it is created initially.
The nConflationAttributes object has two properties action and interval. Both of these are passed into the constructor.
The action property specifies whether published events should replace previous events in the DataGroup or be merged with them. These properties are defined by static fields:
nConflationAttributes.sMergeEvents
nConflationAttributes.sDropEvents
The interval property specifies the interval in milliseconds between event fanout to subscribers. An interval of zero implies events will be fanned out immediately.
Creating a Conflation Attributes Object
//ConflationAttributes specifying merge events and no throttled delivery
nConflationAttributes confattribs =
new nConflationAttributes(nConflationAttributes.sMergeEvents, 0);
//ConflationAttributes specifying merge events and throttled delivery at 1 second intervals
nConflationAttributes confattribs =
new nConflationAttributes(nConflationAttributes.sMergeEvents, 1000);
//ConflationAttributes specifying drop events and throttled delivery at 1 second intervals
nConflationAttributes confattribs =
new nConflationAttributes(nConflationAttributes.sDropEvent, 1000);
Create a Single nDataGroup with Conflation Attributes
public class datagroupListener implements nDataGroupListener {
nSession mySession;
nDataGroup myDataGroup;
public datagroupListener(nSession session, nConflationAttributes confattribs,
String dataGroupName){
mySession = session;
//create a DataGroup passing in this class as a nDataGroupListener and a
//ConflationAttributes
myDataGroup = mySession.createDataGroup(dataGroupName, this, confattribs)
}
}
Create Multiple nDataGroups with Conflation Attributes
nConflationAttributes confattribs =
new nConflationAttributes(nConflationAttributes.sMergeEvents, 1000);
String[] groups = {"myFirstGroup", "mySecondGroup"};
nDataGroup[] myGroups = mySession.createDataGroups(groups, confattribs);
Publishing Events to Conflated DataGroups With A Merge Policy
At this point the server will have a nDataGroup created with the ability to merge incoming events from Registered Events. The next step is to create the Registered events at the publisher.
nRegisteredEvent audEvent = myDataGroup.createRegisteredEvent();
nEventProperties props = audEvent.getProperties();
props.put("bid", 0.8999);
props.put("offer", 0.9999);
props.put("close", "0.8990");
audEvent.commitChanges();
You now have a nRegisteredEvent called audEvent which is bound to the data group that could be called 'aud'. We then set the properties relevant to the application, finally we call commitChanges(), this will send the event, as is, to the server. At this point if the bid was to change then that individual field can be published to the server as follows:
props.put("bid", 0.9999);
audEvent.commitChanges();
This code will send only the new "bid" change to the server. The server will modify the event internally so that any new client subscribing will receive all of the data, yet any existing subscribers will only receive the change.
When a data group has been created with Merge conflation, all registered events published to that data group will have their nEventProperties merged into the snapshot event, before the delta event is delivered to the consumers.
When using Merge conflation with an interval (ie throttling), all updates will be merged into a conflated event (as well as the snapshot event) that will be delivered within the chosen interval. For example, consider the following with a merge conflated group and an interval set to 100ms (i.e. maximum of 10 events a second):
Scenario 1
t0 - Publish Message1, Bid=1.234 (This message will be immediately
delivered, and merged into the snapshot)
t10 - Publish Message2, Offer=1.234 (This message will be held as a
conflation event, and merged into the snapshot)
t20 - Publish Message3, Bid=1.345 (This message will be merged with the
conflated event, and with the snapshot)
t100 - Interval hit (Conflated event containing Offer=1.234,Bid=1.345
is delivered to consumers)
Interval timer reset to +100ms, ie t200
t101 - Publish Message4, Offer=1.345 (This message will be held as a conflation event,
and merged into the snapshot)
Where t0...tn is the time frame in milliseconds from now.
Scenario 2
t0 - Publish Message1, Bid=1.234 (This message will be immediately
delivered, and merged into the snapshot)
t100 - Interval hit (Nothing is sent as there has been no update
since t0)
t101 - Publish Message2, Offer=1.234 (This message will be immediately
delivered, and merged into the snapshot)
Interval timer reset to +100ms, ie t201
Meanwhile, if any new consumers are added to the Data Group, they will always consume the most up to date snapshot and then begin consuming any conflated updates after that.
Publishing Events to Conflated DataGroups With A Drop Policy
If you have specified a "Drop" policy in your ConflationAttributes then events are published in the normal way rather than using nRegisteredEvent.
Consuming Conflated Events from a DataGroup
The subscriber doesn't need to do anything different to receive events from a DataGroup with conflation enabled. If nRegisteredEvents are being delivered then the events will contain only the fields that have changed will be delivered. In all other circumstances an entire event is delivered to all consumers.