Using Durable Subscriptions with Multiple Clients
Window/Prefetch Size of Shared and Serial Durable Subscriptions
The Universal Messaging client API offers the option of specifying a window size (for asynchronous clients) or prefetch size (for synchronous clients) for a shared durable subscription. The window/prefetch size can be set independently for both synchronous and asynchronous subscribers which use shared durable subscriptions. The client API contains two methods that accept the window/prefetch size as a parameter: one for creating an iterator for a channel and one for adding a new subscriber with nEventListener.
When specifying the window/prefetch size, the session is by default not auto-acknowledge. This means that when the window size is reached for an asynchronous client, the client stops receiving events. When an iterator is used with a defined prefetch size for a synchronous client, the server returns the requested number of events, and it is up to the synchronous client whether to commit or roll back or prefetch the next set of events.
If the API parameter for window size is not specified for an asynchronous subscriber, nConstants.setMaxUnackedEvents(x) will be used as the default client window size, if this value has been set before subscribing with listeners. If the durable subscription is not a shared durable subscription, an nIllegalArgumentException is thrown. If you specify a negative window size for this API parameter, the default value stored in the server configurations will be used instead. The default value is "ClientQueueWindow", which is available in the "Cluster Config" group.
Example 1
You can set the size of an asynchronous client window using a method from the public nChannel API: channel.addSubscriber(nEventListener nel, nDurable name, String selector, int windowSize). A different window size can be configured for every asynchronous subscriber. Here is a simple code snippet for this:
nChannelAttributes channelAttr = new nChannelAttributes("channel");
nChannel channel = session.createChannel(channelAttr);
boolean isPersistent = true;
boolean isClusterWide = false;
int startEventId = 0;
nDurableAttributes attr =
nDurableAttributes.create(nDurableAttributes.nDurableType.Shared,
"shared-durable");
attr.setClustered(isClusterWide);
attr.setStartEID(startEventId);
nDurable sharedDurable = null;
// Need to check first in case shared durable already exists!
try {
sharedDurable = channels.getDurableManager().get(attr.getName());
} catch (nNameDoesNotExistException ex) {
sharedDurable = channels.getDurableManager().add(attr);
}
nEventListener listener = new nEventListener() {
@Override
public void go(nConsumeEvent evt) {
System.out.println("Consumed event " + evt.getEventID());
}
};
int windowSize = 4;
channel.addSubscriber(listener, sharedDurable, null, windowSize);
Example 2
You can also set the prefetch size for a synchronous client when using a channel iterator. Here is an example:
nSession session = nSessionFactory.create(sessionAttr);
session.init();
nChannelAttributes channelAttr = new nChannelAttributes("channel");
nChannel channel = session.createChannel(channelAttr);
boolean isPersistent = true;
boolean isClusterWide = false;
int startEventId = 0;
nDurable sharedDurable = null;
// create the shared durable
nDurableAttributes attr =
nDurableAttributes.create(nDurableAttributes.nDurableType.Shared,
"shared-durable");
attr.setClustered(isClusterWide);
attr.setStartEID(startEventId);
// Need to check first in case shared durable is already exist!
try {
sharedDurable = channels.getDurableManager().get(attr.getName());
} catch (nNameDoesNotExistException ex) {
sharedDurable = channels.getDurableManager().add(attr);
}
int prefetchSize = 4;
nChannelIterator iterator = channel.createIterator(sharedDurable, null);
List<nConsumeEvent> evts = iterator.getNext(prefetchSize, timeout)
Checking the number of unprocessed events on a durable
You can use the API method getDepth() on nDurableNode to get the number of events outstanding for a durable.
For non-shared durables this method returns an estimation of the number of outstanding events. This estimation does not include unacknowledged, purged or filtered events.
Example
nRealmNode realmNode =
new nRealmNode(new nSessionAttributes("nsp://localhost:9000"));
nTopicNode topicNode = (nTopicNode) realmNode.findNode(CHANNEL_NAME);
nDurableNode durableNode = topicNode.getDurable(DURABLE_NAME);
long eventCount = durableNode.getDepth();
The number of outstanding events waiting for a commit or a rollback can be found out by:
long unackedCount = durableNode.getTransactionDepth();
Storage Considerations
There are some storage considerations to be aware of when using shared durable subscriptions as opposed to normal (i.e. non-shared) durable subscriptions.
When you define a shared durable subscription, Universal Messaging maintains a copy of all of the events of the durable subscription in an internal store. The copies remain in the internal store until the original events have been accessed by the consumer. The benefit for the consumer is that events are held until the consumer is ready to receive them.
If the consumer accesses the events using event filtering, some events are never selected, and therefore the copies of these events remain in the internal store. As mentioned earlier, when an event is consumed for a normal durable subscription, all earlier events are also removed; however, for a shared durable subscription, only the consumed event is removed from the store. This is by design, and can, in time, lead to Universal Messaging retaining large numbers of events that are never used. If you have made the shared durable subscription persistent, a copy of the internal store will be maintained on disk, and this disk copy will also contain copies of the events that are not used.
In order to delete messages from the internal store you can purge events from the channel that the durable subscription is created on. That purge will also flow through to the internal store.
The set of events that are copied to the internal store for a given shared durable subscription is limited according to the following rules:
When the first consumer connects to the shared durable subscription and specifies an event filter, then only events that match the filter will be copied to the internal store.
If a second consumer connects to the same shared durable subscription and also specifies an event filter, then from that time onwards, only events matching the second filter will be copied to the internal store. The first consumer receives an asynchronous exception due to the changed filter, but still uses its original filter to take events off the internal store.
If the second consumer specifies no event filter, the filter of the first consumer remains in effect for copying events to the internal store.