MyChannels.Universal Messaging Topic Publisher
This example shows how to create a Topic Subscriber using the MyChannels.Universal Messaging API.
Application Source Code
/*
Copyright 2012 Software AG, Darmstadt, Germany and/or Software AG USA
Inc., Reston, United States of America, and/or their licensors.
In the event that you should download or otherwise use this software
you hereby acknowledge and agree to the terms at
http://um.terracotta.org/company/terms.html#legalnotices
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using MyChannels.Universal Messaging;
using System.Threading;
namespace MyChannels.Universal Messaging.Samples
{
class EXTopicPublisher
{
private int messagesSent = 0;
private string rName = null;
private string topicName = "All";
private int count = 10;
private int size = 100;
public EXTopicPublisher(string[] args)
{
// check arguments <rname> [topicname] [count] [size]
// Need a min of 1: rname
if (args.Length < 1)
{
Console.WriteLine("rName required");
Environment.Exit(2);
}
rName = args[0];
if (args.Length > 1)
topicName = args[1];
if (args.Length > 2)
count = Convert.ToInt32(args[2]);
if (args.Length > 3)
size = Convert.ToInt32(args[3]);
start();
}
private void start()
{
var session = new Session(rName);
// Initialize the session
session.Initialize();
Console.WriteLine("Initialized...");
session.AsynchronousExceptionRaised +=
new EventHandler<AsyncExceptionEventArgs>(session_AsynchronousExceptionRaised);
session.ConnectionStatusChanged +=
new EventHandler<ConnectionStatusEventArgs>(session_ConnectionStatusChanged);
var topics = session.Topics;
//Create a byte array filled with characters equal to
// the message size specified. This could be a result
//of String.getBytes() call in a real world scenario.
byte[] buffer = new byte[size];
for (int x = 0; x < size; x++)
{
buffer[x] = (byte)((x % 90) + 32);
}
//Construct a sample Properties object
//Instantiate the message to be published with the specified Properties and buffer
Properties props = new Properties();
Message msg = new Message(props, buffer);
//Inform the user that publishing is about to start
Console.WriteLine("Starting publish of " + count + " events with a size of " + size + " bytes each");
if (topicName == "All")
{
//IProducer producer = queues.CreateProducer(qName);
//for (int x = 0; x < count; x++)
//{
// //Publish the event
// producer.Send(msg);
//}
Console.WriteLine("Publishing to all Queues is not supported at present.");
}
else
{
IProducer producer = topics.CreateProducer(topicName);
for (messagesSent = 0; messagesSent < count; messagesSent++)
{
//Publish the event
producer.Send(msg);
}
}
session.Dispose();
}
void session_ConnectionStatusChanged(object sender, ConnectionStatusEventArgs e)
{
while (!e.IsConnected)
{
Console.WriteLine("Disconnected from Universal Messaging, Sleeping for 1 second...");
try
{
Thread.Sleep(1000);
}
catch (Exception) { }
}
// Decrement the sent messages loop, so that the message sending while the disconnect
//occurred is sent again
--messagesSent;
}
void session_AsynchronousExceptionRaised(object sender, AsyncExceptionEventArgs e)
{
Console.WriteLine("Exception raised: {0}", e.Error.GetType());
Console.WriteLine("Exception message: {0}", e.Error.Message);
Environment.Exit(2);
}
public static void Main(string[] args)
{
new EXTopicPublisher(args);
}
}
}