Universal Messaging C# .NET: RX Topic Subscriber
This example shows how to create a Topic Subscriber using the Universal Messaging Reactive library.
Application Source Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MyChannels.Universal Messaging.Samples
{
/// <summary>
/// This is an example of how RX extensions to the Universal Messaging .Net extended API enables integration with the Reactive
/// Framework from Microsoft
/// </summary>
public class RXTopicSubscriber
{
// the url of the Universal Messaging realm
private string url;
// the channel to subscribe to
private string channel;
/// <summary>
/// Pass the rname url of the realm and the channel to subscribe to
/// </summary>
/// <param name="rname">rname of the realm</param>
/// <param name="rname">cname name of the channel</param>
public RXTopicSubscriber(string rname, string cname)
{
url = rname;
channel = cname;
start();
}
/// <summary>
/// Initialise the Session object and set up the Observable query
/// </summary>
private void start()
{
using (var session = new Session(url))
{
// Initialize the session
session.Initialize();
// Create consumer & query each message
var consumer = session.Topics.CreateConsumer(channel);
var query =
from e in consumer.ToObservable()
select e.Message;
// Subscribe
query.Subscribe(ProcessMessage);
// Wait for input from the console, exit on key entry
Console.ReadLine();
Console.WriteLine("Exiting the application");
}
}
/// <summary>
/// Deal with the message
/// </summary>
/// <param name="m">the Message object received</param>
public void ProcessMessage(Object m)
{
Console.WriteLine("Message :" + ((Message)m).Id);
}
static void Main(string[] args)
{
new RXTopicSubscriber(args[0], args[1]);
}
}
}