Asynchronous Transactional Queue Consumption
Asynchronous transactional queue consumers consume events from a callback on an interface that all asynchronous consumers must implement. We call this interface an nEventListener. The listener interface defines one method called go which when called will pass events to the consumer as they are delivered from the Universal Messaging Realm Server.
Transactional queue consumers have the ability to notify the server when events have been consumed (committed) or when they have been discarded (rolled back). This ensures that the server does not remove events from the queue unless notified by the consumer with a commit or rollback.
An example of a transactional asynchronous queue reader is shown below:
public class myAsyncTxQueueReader : nEventListener {
nQueueAsyncTransactionalReader reader = null;
nQueue myQueue = null;
public myAsyncTxQueueReader() {
// construct your session and queue objects here
// begin consuming events from the queue
nQueueReaderContext ctx = new nQueueReaderContext(this, 10);
reader = myQueue.createAsyncTransactionalReader(ctx);
}
public void go(nConsumeEvent event) {
Console.WriteLine("Consumed event "+event.getEventID());
reader.commit();
}
public static void Main(String[] args) {
new myAsyncTxQueueReader();
}
}
As previously mentioned, the big difference between a transactional asynchronous reader and a standard asynchronous queue reader is that once events are consumed by the reader, the consumers need to commit the events consumed. Events will only be removed from the queue once the commit has been called.
Developers can also call the rollback() method on a transactional reader that will notify the server that any events delivered to the reader that have not been committed, will be rolled back and redelivered to other queue consumers. Transactional queue readers can also commit or rollback any specific event by passing the event id of the event into the commit or rollback calls. For example, if a reader consumes 10 events, with Event IDs 0 to 9, you can commit event 4, which will only commit events 0 to 4 and rollback events 5 to 9.
Subscription with a Filtering Selector
Asynchronous queue consumers can also be created using a selector, which allows the subscription to be filtered based on event properties and their values.
For example, assume some events are being published with the following event properties:
nEventProperties props = new nEventProperties();
props.put("BONDNAME", "bond1");
A developer can create a message selector string such as:
String selector = "BONDNAME='bond1'";
Passing this string into the constructor for the nQueueReaderContext object shown in the example code will ensure that the subscriber will only consume messages that contain the correct value for the event property BONDNAME.