Synchronous Transactional Queue Consuming
Synchronous queue consumers consume events by calling pop() on the Universal Messaging queue reader object. Each pop call made on the queue reader will synchronously retrieve the next event from the queue.
Transactional queue consumers have the ability to notify the server when events have been consumed (committed) or when they have been discarded (rolled back). This ensures that the server does not remove events from the queue unless notified by the consumer with a commit or rollback.
An example of a transactional synchronous queue reader is shown below:
class mySyncTxQueueReader{
nQueueSyncTransactionReader *reader = null;
nQueue *myQueue = null;
public:
mySyncTxQueueReader(){
// construct your session and queue objects here
// construct the transactional queue reader
nQueueReaderContext *ctx = new nQueueReaderContext();
reader = myQueue->createTransactionalReader(ctx);
}
void start(){
while (true) {
// pop events from the queue
nConsumeEvent *event = reader->pop();
go(event);
// commit each event consumed
reader->commit(event->getEventID());
}
}
void go(nConsumeEvent *event) {
printf("Consumed event %d",event->getEventID());
}
int main(int argc, char** argv) {
new mySyncTxQueueReader();
sqr->start();
return 0;
}
}
As previously mentioned, the big difference between a transactional synchronous reader and a standard synchronous queue reader is that once events are consumed by the reader, the consumers need to commit the events consumed. Events will only be removed from the queue once the commit has been called.
Developers can also call the .rollback() method on a transactional reader that will notify the server that any events delivered to the reader that have not been committed, will be rolled back and redelivered to other queue consumers. Transactional queue readers can also commit or rollback any specific event by passing the event id of the event into the commit or rollback calls. For example, if a reader consumes 10 events, with event id's 0 to 9, you can commit event 4, which will only commit events 0 to 4 and rollback events 5 to 9.
Synchronous queue consumers can also be created using a selector, which defines a set of event properties (see
Event Dictionaries) and their values that a consumer is interested in. For example if events are being published with the following event properties:
nEventProperties props =new nEventProperties();
props->put("BONDNAME","bond1");
If you then provide a message selector string in the form of:
std::string selector = "BONDNAME='bond1'";
And pass this string into the constructor for the nQueueReaderContext object shown in the example code, then your consumer will only consume messages that contain the correct value for the event property BONDNAME.
An example of a synchronous queue consumer can be found on the examples page under "Queue Reader".