Universal Messaging 9.8 | Universal Messaging Developer Guide | Enterprise APIs | Enterprise Developer's Guide for C# | Message Queues | Event Fragmentation on Queues
 
Event Fragmentation on Queues
By default, Universal Messaging will only allow events to be published if the size of the event is less than 1Mb. Although this limit can be changed in the Enterprise Manager (see Config tab, FanoutValues/MaxBufferSize), this is not generally recommended; it is usually far more efficient to fragment large events into smaller chunks for publishing.
Universal Messaging can transparently fragment and reconstruct events. Thus, a developer need only invoke one method call to fragment and publish an event. In the same way, the resulting event will be transparently reconstructed when received by the consumer. Under the hood, however, Universal Messaging will publish several smaller messages representing the large event.
A summary of the code needed to publish and consume fragmented events is provided below.
Publishing
The code to publish a large event using fragmentation is as follows:

// The chunk_size is the max size (bytes) for each event. Multiple events will
// be published of size chunk_size until the entire event has been sent.
int chunk_size = 50000;

fw = new nConsumeEventFragmentWriter(myQueue,chunk_size);

// Rather than myQueue.publish(evt), we let the fragment writer handle the publish
fw.publish(evt)
Subscribing
There are various approaches to consuming fragmented events from queues:
Asynchronous Queue Consumer

public class myAsyncQueueReader : nEventListener {

nQueue myQueue = null;

public myAsyncQueueReader() {
// construct your session and queue objects here
// begin consuming events from the queue
nConsumeEventFragmentReader cefr = new nConsumeEventFragmentReader(this);
nQueueReaderContext ctx = new nQueueReaderContext(cefr, 10);
}

public void go(nConsumeEvent event) {
Console.WriteLine("Consumed event "+event.getEventID());
}

public static void Main(String[] args) {
new myAsyncQueueReader();
}
}
Asynchronous Transactional Queue Consumer

public class myAsyncTxQueueReader : nEventListener {

nQueueAsyncTransactionalReader reader = null;
nQueue myQueue = null;

public myAsyncTxQueueReader() {
// construct your session and queue objects here
// begin consuming events from the queue
nConsumeEventFragmentReader cefr = new nConsumeEventFragmentReader(this);
nQueueReaderContext ctx = new nQueueReaderContext(cefr, 10);
reader = myQueue.createAsyncTransactionalReader(ctx);
}

public void go(nConsumeEvent event) {
Console.WriteLine("Consumed event "+event.getEventID());
reader.commit();
}

public static void Main(String[] args) {
new myAsyncTxQueueReader();
}
}
Synchronous Queue Consumer

public class mySyncQueueReader {

nQueueSyncReader reader = null;
nQueue myQueue = null;

public mySyncQueueReader() {
// construct your session and queue objects here
// construct the queue reader
nConsumeEventFragmentReader cefr = new nConsumeEventFragmentReader(this);
nQueueReaderContext ctx = new nQueueReaderContext(cefr, 10);

reader = myQueue.createFragmentReader(ctx);
}

public void start() {
while (true) {
// pop events from the queue
nConsumeEvent event = reader.pop();
go(event);
}
}

public void go(nConsumeEvent event) {
Console.WriteLine("Consumed event "+event.getEventID());
}

public static void Main(String[] args) {
mySyncQueueReader sqr = new mySyncQueueReader();
sqr.start();
}

}

Synchronous Transactional Queue Consumer

public class mySyncTxQueueReader {

nQueueSyncTransactionReader reader = null;
nQueue myQueue = null;

public mySyncTxQueueReader() {
// construct your session and queue objects here
// construct the transactional queue reader
nConsumeEventFragmentReader cefr = new nConsumeEventFragmentReader(this);
nQueueReaderContext ctx = new nQueueReaderContext(cefr, 10);

reader = myQueue.createTransactionalFragmentReader(ctx);
}

public void start() {
while (true) {
// pop events from the queue
nConsumeEvent event = reader.pop();
go(event);
// commit each event consumed
reader.commit(event.getEventID());
}
}

public void go(nConsumeEvent event) {
Console.WriteLine("Consumed event "+event.getEventID());
}

public static void Main(String[] args) {
mySyncTxQueueReadersqr = new mySyncTxQueueReader();
sqr.start();
}
}

Copyright © 2013-2015 | Software AG, Darmstadt, Germany and/or Software AG USA, Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.