MyChannels.Nirvana API: Reactive Extensions
Reactive Extensions for .NET (commonly referred to as "Rx") is a new library currently under development by
Microsoft that aims to allow the development of so-called "reactive" applications, by exposing the Observer pattern (as seen in C# Multicast delegates and Events), but in a simpler, more intuitive manner.
Nirvana.Reactive
The Universal Messaging Reactive library for .NET aims to make use of the capabilities offered by Rx, by allowing the conversion from Universal Messaging objects to Observable sequences and vice versa.
Currently, the library only supports the conversion from Universal Messaging objects to Observable sequences, and is designed to work with the MyChannels.Nirvana API. One main method is included: ToObservable(), which converts the messages from either a IConsumer (Topics and Queues) or a IDataGroupSession. This means that consuming messages on a Topic or Queue looks distinctly different from the more conventional Consumer method.
var consumer = session.Topics.CreateConsumer("Topic1");
var query = from e in consumer.ToObservable()
select e.Message;
// Subscribe
query.Subscribe(ProcessMessage);
//...
public void ProcessMessage(object m)
{
Console.WriteLine("Message: {0}", ((Message)m).Id);
}
This looks somewhat confusing at first glance, but is simple enough when broken down. The ToObservable() call on the Topic Consumer returns an Observable sequence of MessageEventArgs, as returned when the MessageReceived event is fired in the MyChannels.Nirvana API on the Consumer. The query simply filters that sequence to obtain the Messages from each MessageEventArgs. The Subscribe() method allows a handling method to be attached to the Observable sequence, just as one would attach an event handler to an typical event. In this case, the ProcessMessage() method simply writes the Id of the message received to the console.
DataGroups work in a similar fashion. As DataGroups do not have Consumers in the manner of Topics and Queues, the ToObservable() method is instead called on the IDataGroupSession object, returning an Observable sequence which can be manipulated in an identical fashion.
var query = from e in session.DataGroups.ToObservable()
select e.Message;
// Subscribe
query.Subscribe(ProcessMessage);