/*
*
* Copyright (c) 1999 - 2011 my-Channels Ltd
* Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
*
* Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.
*
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MyChannels.Nirvana.Samples
{
///
/// This is an example of how RX entensions to the Nirvana .Net extended API enables integration with the Reactive Framework from Microsoft
///
public class RXQueueConsumer
{
// the url of the Nirvana realm
private string url;
// the queue to consume from
private string queue;
///
/// Pass the rname url of the realm and the queue name
///
/// rname of the realm
/// qname name of the queue
public RXQueueConsumer(string rname, string qname)
{
url = rname;
queue = qname;
start();
}
///
/// Initialise the Session object and set up the Observale query
///
private void start()
{
using (var session = new Session(url))
{
// Initialize the session
session.Initialize();
// Create consumer & query each message
var consumer = session.Queues.CreateConsumer(queue);
var query =
from e in consumer.ToObservable()
select e.Message;
// Subscribe
query.Subscribe(ProcessMessage);
// Wait for input from the console, exit on key entry
Console.ReadLine();
Console.WriteLine("Exiting the application");
}
}
///
/// Deal with the message
///
/// the Message object received
public void ProcessMessage(Object m)
{
Console.WriteLine("Message :" + ((Message)m).Id);
}
static void Main(string[] args)
{
new RXQueueConsumer(args[0], args[1]);
}
}
}