/*
*
* Copyright (c) 1999 - 2011 my-Channels Ltd
* Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
*
* Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.
*
*/
using System;
using System.Collections.Generic;
using System.Threading;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
///
/// nDataGroup publisher sample application
///
/// Will create a group specified by the parameter passed on the command line. All known data stream connections (except itself) will
/// then be added to the created group when callback is received for new stream connections. When no stream connections exist, publishing stops.
/// When streams exist, publishing to the created group commences.
///
class DataGroupPublisher : nSampleApp, nDataGroupListener
{
private bool isOk = true;
private nBaseClientException asyncException = new nBaseClientException("ASync Exception");
private nDataGroup myGroup = null;
private bool hasStreams = false;
private Object mutex = new object();
private static DataGroupPublisher mySelf = null;
private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding();
/**
* This method demonstrates the Nirvana API calls necessary to publish to
* an nDataGroup.
* It is called after all command line arguments have been received and
* validated
*
* @param realmDetails a String[] containing the possible RNAME values
* @param groupName the data group name to publish to
* @param count the number of messages to publish
* @param size the size (bytes) of each message to be published
* @param isMulticast whether the data group is multicast enabled
* @param isConflated whether the data group has a conflation policy
* @param conflationType type of conflation
* @param conflationInterval millisecond interval for conflation with throttle
*/
private void doit(String[] realmDetails, String groupName, int count, int size, bool isMulticast, bool isConflated, int conflationType, int conflationInterval)
{
mySelf.constructSession(realmDetails);
try
{
IEnumerable grps = mySession.getDataGroups(this);
foreach (var grp in grps)
{
if (grp.Name.Equals(groupName))
{
myGroup = grp;
myGroup.addListener(this);
break;
}
}
// if group not found, create it
if (myGroup == null)
{
if (isConflated)
{
//Create a conflation attributes based on given parameters
nConflationAttributes conflationAttributes = new nConflationAttributes(conflationType, conflationInterval);
myGroup = mySession.createDataGroup(groupName, this, conflationAttributes, isMulticast);
}
else
//Create a standard data group
myGroup = mySession.createDataGroup(groupName, this, isMulticast);
}
// get the default group to determine how many stream users are connected
nDataGroup defaultGroup = mySession.getDefaultDataGroup(this);
// just add each connected stream to the group
IEnumerable streams = defaultGroup.getStreams();
hasStreams = myGroup.size() > 0;
foreach (var stream in streams)
{
if (!stream.Name.Equals(mySession.getStreamId()) && (!stream.Subject.Contains("admin")))
{
myGroup.add(stream);
}
}
//Create a byte array filled with characters equal to
// the message size specified. This could be a result
//of String.getBytes() call in a real world scenario.
byte[] buffer = new byte[size];
for (int x = 0; x < size; x++)
{
buffer[x] = (byte)((x % 90) + 32);
}
//Instantiate the message to be published with the specified nEventPropeties / byte[]
nEventProperties props = new nEventProperties();
//You can add other types in a dictionary object
props.put("key0string", "1");
props.put("key1int", (int)1);
props.put("key2long", (long)-11);
nConsumeEvent evt1 = new nConsumeEvent(props, buffer);
//Inform the user that publishing is about to start
Console.WriteLine("Starting publish of " + count + " events with a size of " + size + " bytes each");
//Get a timestamp to be used to calculate the message publishing rates
DateTime start = DateTime.Now;
//Loop as many times as the number of messages we want to publish
for (int x = 0; x < count; x++)
{
while (!hasStreams)
{
lock (mutex)
{
Console.WriteLine("No streams are registered, waiting for notification");
Monitor.Wait(mutex);
if (hasStreams)
{
Console.WriteLine("Streams are registered starting to publish");
}
}
}
try
{
//Publish the event
mySession.writeDataGroup(evt1, myGroup);
//Thread.Sleep(1000);
}
catch (nSessionNotConnectedException )
{
while (!mySession.isConnected())
{
Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second...");
try
{
Thread.Sleep(1000);
}
catch (Exception ) { }
}
x--;
//We need to repeat the publish for the event publish that caused the exception,
//so we reduce the counter
}
catch (nBaseClientException ex)
{
Console.WriteLine("Publish.cs : Exception : " + ex.Message);
throw ex;
}
//Check if an asynchronous exception has been received
if (!isOk)
{
//If it has, then throw it
throw asyncException;
}
}
//Check if an asynchronous exception has been received
if (!isOk)
{
//If it has, then throw it
throw asyncException;
}
DateTime end = DateTime.Now;
//Get a timestamp to calculate the publishing rates
//Calculate the events / sec rate
TimeSpan dif = end - start;
long eventPerSec = ((count * 1000) / (long)(dif.TotalMilliseconds));
//Calculate the bytes / sec rate
long bytesPerSec = eventPerSec * size;
//Inform the user of the resulting rates
Console.WriteLine("Publish Completed in " + dif.TotalMilliseconds + "ms :");
Console.WriteLine("[Events Published = " + count + "] [Events/Second = " + eventPerSec + "] [Bytes/Second = " + bytesPerSec + "]");
Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
}
catch (nSessionPausedException )
{
Console.WriteLine("Session has been paused, please resume the session");
Environment.Exit(1);
}
catch (nSecurityException )
{
Console.WriteLine("Unsufficient permissions for the requested operation.");
Console.WriteLine("Please check the ACL settings on the server.");
Environment.Exit(1);
}
catch (nSessionNotConnectedException )
{
Console.WriteLine("The session object used is not physically connected to the Nirvana realm.");
Console.WriteLine("Please ensure the realm is up and check your RNAME value.");
Environment.Exit(1);
}
catch (nUnexpectedResponseException )
{
Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
Environment.Exit(1);
}
catch (nRequestTimedOutException )
{
Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
Environment.Exit(1);
}
//Close the session we opened
try
{
nSessionFactory.close(mySession);
}
catch (Exception ) { }
//Close any other sessions so that we can exit
nSessionFactory.shutdown();
}
// callback for DataStreamListener
public void onMessage(nConsumeEvent evt) {
}
// new stream added to default data group
public void addedStream(nDataGroup group, nDataStream stream, int count)
{
Console.WriteLine(group.Name + " : New stream " + stream.Name + " : " + stream.Subject + " added, size now " + count);
if ((!stream.Subject.Contains("admin")) && (group.Name.Equals(myGroup.Name)) && count > 0)
{
hasStreams = true;
Console.WriteLine(group.Name + " : " + count + " Streams are registered, starting publisher thread");
lock (mutex)
{
Monitor.PulseAll(mutex);
}
}
else
{
if ((!stream.Subject.Contains("admin")) &&!stream.Name.Equals(mySession.getStreamId()))
{
myGroup.add(stream);
}
}
}
// stream removed from data group
public void deletedStream(nDataGroup group, nDataStream stream, int count, bool serverInduced)
{
Console.WriteLine(group.Name + " : Data stream " + stream.Name + " : " + stream.Subject + " removed, size now " + count);
if (group.Name.Equals(myGroup.Name) && count == 0)
{
hasStreams = false;
Console.WriteLine("No streams are registered, stopping publisher thread");
}
else if (group.Name.Equals(myGroup.Name) && count > 0)
{
hasStreams = true;
Console.WriteLine(group.Name + " : " + count + " Streams are registered, starting publisher thread");
}
}
// new data group added
public void addedGroup(nDataGroup to, nDataGroup group, int count)
{
Console.WriteLine(to.Name + " : Data group " + group.Name + " added, size now " + count);
}
// data group removed
public void removedGroup(nDataGroup from, nDataGroup group, int count)
{
Console.WriteLine(from.Name + " : Data group " + group.Name + " removed, size now " + count);
}
public void deletedGroup(nDataGroup group)
{
Console.WriteLine("Data group deleted : "+group.Name);
}
public void createdGroup(nDataGroup group)
{
Console.WriteLine("Data group created : " + group.Name);
}
protected override void processArgs(String[] args)
{
//
// Need a min of 2, rname, group name, conflation
if (args.Length < 3)
{
Usage();
Environment.Exit(2);
}
String RNAME = args[0];
var groupName = args[1];
int count = 1000;
if (args.Length > 2)
{
count = Convert.ToInt32(args[2]);
}
int size = 1024;
if (args.Length > 3)
{
size = Convert.ToInt32(args[3]);
}
bool isMulticast = false;
if (args.Length > 4)
{
isMulticast = Convert.ToBoolean(args[4]);
}
bool isConflated = false;
if (args.Length > 5)
{
isConflated = Convert.ToBoolean(args[5]);
}
int conflationType = nConflationAttributes.sMergeEvents;
if (args.Length > 6)
{
if (args[5].ToLower() == "merge")
conflationType = nConflationAttributes.sMergeEvents;
if (args[5].ToLower() == "drop")
conflationType = nConflationAttributes.sDropEvents;
}
int conflationInterval = 500;
if (args.Length > 7)
{
conflationInterval = Convert.ToInt32(args[7]);
}
//
// Run the sample app
//
mySelf.doit(parseRealmProperties(RNAME), groupName, count, size, isMulticast, isConflated, conflationType, conflationInterval);
}
public static void Main(String[] args)
{
//Create an instance for this class
mySelf = new DataGroupPublisher();
//Process command line arguments
mySelf.processArgs(args);
}
/**
* Prints the usage message for this class
*/
private static void Usage()
{
Console.WriteLine("Usage ...\n");
Console.WriteLine("DataGroupPublish [count] [size] [conflation merge or drop] [conflation interval \n");
Console.WriteLine(
" \n");
Console.WriteLine(
" - the rname of the server to connect to");
Console.WriteLine(
" - Data group name parameter to publish to");
Console.WriteLine(
" - enable conflation true or false");
Console.WriteLine(
"\n[Optional Arguments] \n");
Console.WriteLine(
"[count] -The number of events to publish (default: 10)");
Console.WriteLine(
"[size] - The size (bytes) of the event to publish (default: 100)");
Console.WriteLine(
"[conflation merge or drop] - merge to enable merge or drop to enable drop (default: merge)");
Console.WriteLine(
"[conflation interval] - the interval for conflation to publish(default: 500");
}
}
}