/* Copyright 1999-2011 (c) My-Channels Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. */ using System; using System.Collections.Generic; namespace com.pcbsys.nirvana.apps { using com.pcbsys.nirvana.client; /// /// Subscribes to all nirvana channels within a container to demonstrate the nSession.subscribe method /// public class sessionsubscriber : nSampleApp, nEventListener { private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding(); internal static long startEid; internal static string selector = null; private DateTime startTime; private long lastEID = 0; private long byteCount = 0; private int logLevel = 0; private int count = -1; private int totalMsgs = 0; private int reportCount = 10000; private static sessionsubscriber mySelf = null; /// /// This method demonstrates the Nirvana API calls necessary to subscribe to /// multiple channels in 1 method call. It is called after all command line arguments have been /// received and validated /// /// /// a String[] containing the possible RNAME values /// /// the container within the namespace to subscribe to /// /// the subscription selector filter /// /// the eid to start subscribing from /// /// the specified log level /// /// the specified report count /// private void doit(string[] realmDetails, string achannelName, string selector, long startEid, int loglvl, int repCount) { logLevel = loglvl; reportCount = repCount; mySelf.constructSession(realmDetails); try { // get the channels within the specified container within the namespace nChannelAttributes[] channels = mySession.getChannels(achannelName); // Create the array of subscriber attributes objects nSubscriptionAttributes[] attrs = new nSubscriptionAttributes[channels.Length]; for (int x = 0; x < attrs.Length; x++) { attrs[x] = new nSubscriptionAttributes(channels[x].getFullName(), selector, startEid, this); } mySession.subscribe(attrs); List myChannels = new List(); for (int x = 0; x < attrs.Length; x++) { if (attrs[x].Successful) { Console.WriteLine("Successfully subscribed to " + attrs[x].ChannelName); myChannels.Add(attrs[x].Channel); } else { Console.WriteLine("Failed to subscribe to " + attrs[x].ChannelName + " : " + attrs[x].Exception.Message); Console.WriteLine(attrs[x].Exception.StackTrace); } } // Stay subscribed until the user presses any key Console.WriteLine("Press any key to quit !"); Console.Read(); Console.WriteLine("Finished. Consumed total of " + totalMsgs); } catch (Exception nbce) { Console.WriteLine("An error occured while creating the Channel Attributes object."); Console.WriteLine(nbce.Message); Environment.Exit(1); } // Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ex) { } // Close any other sessions within this App so that we can exit nSessionFactory.shutdown(); } protected override void processArgs(String[] args) { // // Need a min of 2, rname, channel name if (args.Length < 2) { Usage(); Environment.Exit(2); } String RNAME = args[0]; ; var channelName = args[1]; var start = 0; if (args.Length > 2) { start = Convert.ToInt32(args[2]); } var loglvl = 3; if (args.Length > 3) { loglvl = Convert.ToInt32(args[3]); } // // Optional Parameters // var report = 1000; //Check for a selector message filter value if (args.Length > 4) { report = Convert.ToInt32(args[4]); } String sel = null; if (args.Length > 5) { sel = args[5]; } // // Run the sample app // mySelf.doit(parseRealmProperties(RNAME), channelName, sel, start, loglvl, report); } public static void Main(String[] args) { //Create an instance for this class mySelf = new sessionsubscriber(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("sessionsubscriber [start eid] [debug] [count] [selector] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " - Folder name parameter for the location of the channels to subscribe to"); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[start eid] - The Event ID to start subscribing from"); Console.WriteLine( "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All"); Console.WriteLine( "[count] - The number of events to wait before printing out summary information"); Console.WriteLine( "[selector] - The event filter string to use"); } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param evt An nConsumeEvent object containing the message received from the channel */ public void go(nConsumeEvent evt) { //If this is the first message we receive if (count == -1) { //Get a timestamp to be used for message rate calculations startTime = DateTime.Now; count = 0; } //Increment he counter count++; totalMsgs++; //Have we reached the point where we need to report the rates? if (count == reportCount) { //Reset the counter count = 0; //Get a timestampt to calculate the rates DateTime end = DateTime.Now; // Does the specified log level permits us to print on the screen? if (logLevel >= 1) { //Dump the rates on the screen if (!end.Equals(startTime)) { TimeSpan ts = end - startTime; double milli = ts.TotalMilliseconds; Console.WriteLine("Received " + reportCount + " in " + (milli) + " Evt/Sec = " + ((reportCount * 1000) / (milli)) + " Bytes/sec=" + ((byteCount * 1000) / (milli))); Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]"); } else { Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter"); } } //Set the startTime for the next report equal to the end timestamp for the previous one startTime = end; //Reset the byte counter byteCount = 0; } //If the last EID counter is not equal to the current event ID if (lastEID != evt.getEventID()) { //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1)); //Reset the last eid counter lastEID = evt.getEventID() + 1; } else { //Increment the last eid counter lastEID++; } //Get the data of the message byte[] buffer = evt.getEventData(); if (buffer != null) { //Add its length to the byte counter byteCount += buffer.Length; } //If the loglevel permits printing on the screen if (logLevel >= 2) { //Print the eid Console.WriteLine("Event id : " + evt.getEventID()); if (evt.isEndOfChannel()) { Console.WriteLine("End of channel reached"); } //If the loglevel permits printing on the screen if (logLevel >= 3) { //Print the message tag Console.WriteLine("Event tag : " + evt.getEventTag()); //Print the message data Console.WriteLine("Event data : " + encoding.GetString(evt.getEventData())); if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } } } } // End of sessionsubscriber Class }