Universal Messaging 10.11 | Developer Guide | Enterprise APIs | Enterprise Developer's Guide for C# | Publish / Subscribe using Channel Topics | Using Durable Objects
 
Using Durable Objects
This section describes how to use durable objects and shared durable objects using the Universal Messaging API for C#.
Creating durable objects
For operations like creating, retrieving, deleting or unbinding a durable object, the nDurableManager must be used. Every channel has a durable manager associated with it. For information on how to use it, check the following examples.
Example 1
Getting the durable manager:

nSession session = nSessionFactory.create(new nSessionAttributes("nsp://localhost:11000"));
session.init();
nChannelAttributes channelAttributes = new nChannelAttributes("testChannel");
nChannel channel = session.createChannel(channelAttributes);
nDurableManager durableManager = channel.getDurableManager();
Example 2
Creating durable objects using the durable manager instance from the example above:

nDurableAttributes exclusiveAttributes = nDurableAttributes.create(
nDurableAttributes.nDurableType.Named, "exclusive");
nDurable exclusiveDurable = durableManager.add(exclusiveAttributes);

nDurableAttributes sharedAttributes = nDurableAttributes.create(
nDurableAttributes.nDurableType.Shared, "shared");
nDurable sharedDurable = durableManager.add(sharedAttributes);

nDurableAttributes serialAttributes = nDurableAttributes.create(
nDurableAttributes.nDurableType.Serial, "serial");
nDurable serialDurable = durableManager.add(serialAttributes);
Here we have listed the different options to create a new durable object. It can be either shared, serial or non-shared. The non-shared durable object can be a simple exclusive durable object where only one subscriber can be attached at a time.
Example 3
Deleting a durable object:
durableManager.delete(exclusiveDurable);
Example 4
Retrieving a durable object:
*Retrieving the durable object can be done using only the name of the durable object:
nDurable returnedDurable = durableManager.get("shared");
*Or you can also retrieve all durables for the channel to which the durable manager corresponds:
nDurable[] allDurables = durableManager.getAll();
Shared durables support purging events - by filter, by event id, by range of event ids or all of the events. The following methods defined on nDurable can be used. They provide an implementation only for shared durables:

void remove(long eid)

void remove(long start, long end)

void remove(string filter)

void removeAll()
For the other durable types an nIllegalStateException is thrown.
Creating a durable subscription
After durable subscriptions are created on the server, a consumer can be created using the nChannel instance. You can create both synchronous and asynchronous subscriptions.
For creating asynchronous durable consumers, the following methods can be used:
void addSubscriber(nEventListener listener, nDurable durable)

void addSubscriber(nEventListener listener, nDurable durable,
string selector, bool autoAck)

void addSubscriber(nEventListener listener, nDurable durable,
nNamedPriorityListener priorityListener)

void addSubscriber(nEventListener listener, nDurable durable,
string selector, bool autoAck, nNamedPriorityListener priorityListener)
The API for creating synchronous durable subscribers is:
nChannelIterator createIterator(nDurable durable)

nChannelIterator createIterator(nDurable durable, string selector)
A public interface is available for committing and rolling back events. The methods are defined for the nDurable instance. To be able to apply these operations on a single event and not only on consecutive event IDs is of significant importance for the shared types.
Here are the methods defined in the nDurable class:
void acknowledge(nConsumeEventToken eventToken, bool isSynchronous)

void rollback(nConsumeEventToken eventToken, bool isSynchronous)
By default, these methods include previous outstanding events that were received in the same connection.
This functionality is extended for the durables which are shared. On an nSharedDurable instance you can also invoke:

void acknowledge(List<nConsumeEventToken> eventTokens, bool isSynchronous)

void acknowledge(nConsumeEventToken eventToken, bool isSynchronous,
bool includePreviousEventsOutstanding)

void rollback(List<nConsumeEventToken> eventTokens, bool isSynchronous)

void rollback(nConsumeEventToken eventTocken, bool isSynchronous,
bool includePreviousEventsOutstanding)
Note:
There is a difference in the behavior regarding the re-delivery of rolled back events. If the durable object is shared, events are redelivered immediately. But for exclusive durables, events are redelivered after re-subscribing.
The nConsumeEventToken is an abstraction for the event identifier and is built internally. The value can be retrieved using the public nConsumeEventToken getEventIdentifier() method defined for the nConsumeEvent instance.
nDurableViewer instances can be created for browsing the events on the durable object. Its API supports the following methods:
nConsumeEvent next()

void close()
for retrieving the next event and closing the viewer.
Restrictions
The nAdminAPI has no additional extensions for working with nDurable objects. For example, nDurableNode and nDurableConnectionNode, available in the client API for Java, are not ported in the client API for C# .
API Support for Reactive Extensions (Rx)
The Universal Messaging native client API provides support for Reactive Extensions (Rx). This support provides the ability to create shared or exclusive / named durable subscribers using the following method on the topic retrieved from the initialized session:

ITransactionalConsumer CreateDurableConsumer(
string name, string unique, DurableType durableType,
string filter, bool subscribeToPurge = false)
A DurableType enumeration has been added with the available options: Named and Shared.
See the following example:

using (var session = new Session("nsp://localhost:11000"))
{
// Initialize the session
session.Initialize();

// Create consumer & subscribe
consumer = session.Topics.CreateDurableConsumer(
"channel", "durable", DurableType.Named, "filter");
var query =
from e in consumer.ToObservable()
select e.Message;

// Subscribe
query.Subscribe(ProcessMessage);
}

// Example function to process messages
void ProcessMessage(IMessage message)
{
if (message != null)
{
Console.WriteLine("Message received {0}.", message.Id);
}
}
The consumer implements ITransactionalConsumer, so you would be able to commit or roll back events by simply invoking consumer.commit() or consumer.rollback(), which will commit or roll back all of the unacknowledged events up to and including the last received event.