/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Collections.Generic; using System.Threading; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { /// /// nDataGroup publisher sample application /// /// Will create a group specified by the parameter passed on the command line. All known data stream connections (except itself) will /// then be added to the created group when callback is received for new stream connections. When no stream connections exist, publishing stops. /// When streams exist, publishing to the created group commences. /// class DataGroupPublisher : nSampleApp, nDataGroupListener { private bool isOk = true; private nBaseClientException asyncException = new nBaseClientException("ASync Exception"); private nDataGroup myGroup = null; private bool hasStreams = false; private Object mutex = new object(); private static DataGroupPublisher mySelf = null; private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding(); /** * This method demonstrates the Nirvana API calls necessary to publish to * an nDataGroup. * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param groupName the data group name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published * @param isMulticast whether the data group is multicast enabled * @param isConflated whether the data group has a conflation policy * @param conflationType type of conflation * @param conflationInterval millisecond interval for conflation with throttle */ private void doit(String[] realmDetails, String groupName, int count, int size, bool isMulticast, bool isConflated, int conflationType, int conflationInterval) { mySelf.constructSession(realmDetails); try { IEnumerable grps = mySession.getDataGroups(this); foreach (var grp in grps) { if (grp.Name.Equals(groupName)) { myGroup = grp; myGroup.addListener(this); break; } } // if group not found, create it if (myGroup == null) { if (isConflated) { //Create a conflation attributes based on given parameters nConflationAttributes conflationAttributes = new nConflationAttributes(conflationType, conflationInterval); myGroup = mySession.createDataGroup(groupName, this, conflationAttributes, isMulticast); } else //Create a standard data group myGroup = mySession.createDataGroup(groupName, this, isMulticast); } // get the default group to determine how many stream users are connected nDataGroup defaultGroup = mySession.getDefaultDataGroup(this); // just add each connected stream to the group IEnumerable streams = defaultGroup.getStreams(); hasStreams = myGroup.size() > 0; foreach (var stream in streams) { if (!stream.Name.Equals(mySession.getStreamId()) && (!stream.Subject.Contains("admin"))) { myGroup.add(stream); } } //Create a byte array filled with characters equal to // the message size specified. This could be a result //of String.getBytes() call in a real world scenario. byte[] buffer = new byte[size]; for (int x = 0; x < size; x++) { buffer[x] = (byte)((x % 90) + 32); } //Instantiate the message to be published with the specified nEventPropeties / byte[] nEventProperties props = new nEventProperties(); //You can add other types in a dictionary object props.put("key0string", "1"); props.put("key1int", (int)1); props.put("key2long", (long)-11); nConsumeEvent evt1 = new nConsumeEvent(props, buffer); //Inform the user that publishing is about to start Console.WriteLine("Starting publish of " + count + " events with a size of " + size + " bytes each"); //Get a timestamp to be used to calculate the message publishing rates DateTime start = DateTime.Now; //Loop as many times as the number of messages we want to publish for (int x = 0; x < count; x++) { while (!hasStreams) { lock (mutex) { Console.WriteLine("No streams are registered, waiting for notification"); Monitor.Wait(mutex); if (hasStreams) { Console.WriteLine("Streams are registered starting to publish"); } } } try { //Publish the event mySession.writeDataGroup(evt1, myGroup); //Thread.Sleep(1000); } catch (nSessionNotConnectedException ) { while (!mySession.isConnected()) { Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second..."); try { Thread.Sleep(1000); } catch (Exception ) { } } x--; //We need to repeat the publish for the event publish that caused the exception, //so we reduce the counter } catch (nBaseClientException ex) { Console.WriteLine("Publish.cs : Exception : " + ex.Message); throw ex; } //Check if an asynchronous exception has been received if (!isOk) { //If it has, then throw it throw asyncException; } } //Check if an asynchronous exception has been received if (!isOk) { //If it has, then throw it throw asyncException; } DateTime end = DateTime.Now; //Get a timestamp to calculate the publishing rates //Calculate the events / sec rate TimeSpan dif = end - start; long eventPerSec = ((count * 1000) / (long)(dif.TotalMilliseconds)); //Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates Console.WriteLine("Publish Completed in " + dif.TotalMilliseconds + "ms :"); Console.WriteLine("[Events Published = " + count + "] [Events/Second = " + eventPerSec + "] [Bytes/Second = " + bytesPerSec + "]"); Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]"); } catch (nSessionPausedException ) { Console.WriteLine("Session has been paused, please resume the session"); Environment.Exit(1); } catch (nSecurityException ) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Environment.Exit(1); } catch (nSessionNotConnectedException ) { Console.WriteLine("The session object used is not physically connected to the Nirvana realm."); Console.WriteLine("Please ensure the realm is up and check your RNAME value."); Environment.Exit(1); } catch (nUnexpectedResponseException ) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Environment.Exit(1); } catch (nRequestTimedOutException ) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values."); Environment.Exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } // callback for DataStreamListener public void onMessage(nConsumeEvent evt) { } // new stream added to default data group public void addedStream(nDataGroup group, nDataStream stream, int count) { Console.WriteLine(group.Name + " : New stream " + stream.Name + " : " + stream.Subject + " added, size now " + count); if ((!stream.Subject.Contains("admin")) && (group.Name.Equals(myGroup.Name)) && count > 0) { hasStreams = true; Console.WriteLine(group.Name + " : " + count + " Streams are registered, starting publisher thread"); lock (mutex) { Monitor.PulseAll(mutex); } } else { if ((!stream.Subject.Contains("admin")) &&!stream.Name.Equals(mySession.getStreamId())) { myGroup.add(stream); } } } // stream removed from data group public void deletedStream(nDataGroup group, nDataStream stream, int count, bool serverInduced) { Console.WriteLine(group.Name + " : Data stream " + stream.Name + " : " + stream.Subject + " removed, size now " + count); if (group.Name.Equals(myGroup.Name) && count == 0) { hasStreams = false; Console.WriteLine("No streams are registered, stopping publisher thread"); } else if (group.Name.Equals(myGroup.Name) && count > 0) { hasStreams = true; Console.WriteLine(group.Name + " : " + count + " Streams are registered, starting publisher thread"); } } // new data group added public void addedGroup(nDataGroup to, nDataGroup group, int count) { Console.WriteLine(to.Name + " : Data group " + group.Name + " added, size now " + count); } // data group removed public void removedGroup(nDataGroup from, nDataGroup group, int count) { Console.WriteLine(from.Name + " : Data group " + group.Name + " removed, size now " + count); } public void deletedGroup(nDataGroup group) { Console.WriteLine("Data group deleted : "+group.Name); } public void createdGroup(nDataGroup group) { Console.WriteLine("Data group created : " + group.Name); } protected override void processArgs(String[] args) { // // Need a min of 2, rname, group name, conflation if (args.Length < 3) { Usage(); Environment.Exit(2); } String RNAME = args[0]; var groupName = args[1]; int count = 1000; if (args.Length > 2) { count = Convert.ToInt32(args[2]); } int size = 1024; if (args.Length > 3) { size = Convert.ToInt32(args[3]); } bool isMulticast = false; if (args.Length > 4) { isMulticast = Convert.ToBoolean(args[4]); } bool isConflated = false; if (args.Length > 5) { isConflated = Convert.ToBoolean(args[5]); } int conflationType = nConflationAttributes.sMergeEvents; if (args.Length > 6) { if (args[5].ToLower() == "merge") conflationType = nConflationAttributes.sMergeEvents; if (args[5].ToLower() == "drop") conflationType = nConflationAttributes.sDropEvents; } int conflationInterval = 500; if (args.Length > 7) { conflationInterval = Convert.ToInt32(args[7]); } // // Run the sample app // mySelf.doit(parseRealmProperties(RNAME), groupName, count, size, isMulticast, isConflated, conflationType, conflationInterval); } public static void Main(String[] args) { //Create an instance for this class mySelf = new DataGroupPublisher(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("DataGroupPublish [count] [size] [conflation merge or drop] [conflation interval \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " - Data group name parameter to publish to"); Console.WriteLine( " - enable conflation true or false"); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[count] -The number of events to publish (default: 10)"); Console.WriteLine( "[size] - The size (bytes) of the event to publish (default: 100)"); Console.WriteLine( "[conflation merge or drop] - merge to enable merge or drop to enable drop (default: merge)"); Console.WriteLine( "[conflation interval] - the interval for conflation to publish(default: 500"); } } }