/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Threading; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { class namedchanneliterator : nSampleApp, nEventListener { private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding(); private string channelName; private string RNAME; static long startEid = nDurable.DEFAULT_START_EID; static String selector; static String subname; static bool cluster; static bool persistent; static bool shared; private long lastEID; private long startTime; private long byteCount; private int count = -1; private int totalMsgs; private int logLevel = 1; private int reportCount = 1000; private nChannelIterator iterator; private Thread channelReader; private nChannel myChannel; private static namedchanneliterator mySelf; private nDurable durable; /** * This method contains the Nirvana calls necessary to implement a channel iterator as a durable * subscriber. * * @param realmDetails An array of RNAME values * @param achannelName The name of the channel to connect to */ public void doit(String[] realmDetails, String achannelName) { mySelf.constructSession(realmDetails); //Subscribes to the specified channel try { //Create a channel attributes object nChannelAttributes nca = new nChannelAttributes(); nca.setName(achannelName); //Obtain the channel reference myChannel = mySession.findChannel(nca); //Create a durable object nDurableAttributes.nDurableType durableType = nDurableAttributes.nDurableType.Named; if (shared) { durableType = nDurableAttributes.nDurableType.Shared; } nDurableAttributes attributes = nDurableAttributes.create(durableType, subname); attributes.setClustered(cluster); attributes.setPersistent(persistent); attributes.setStartEid(startEid); if (selector != null && !selector.Equals("")) { attributes.setSelector(selector); } durable = myChannel.getDurableManager().add(attributes); //create the channel iterator object and the read thread iterator = myChannel.createIterator(durable, selector); ThreadStart ts = new ThreadStart(run); channelReader = new Thread(ts); channelReader.IsBackground = true; channelReader.Start(); //Stay subscribed until the user presses any key Console.WriteLine("Press enter to quit !"); try { Console.In.Read(); } catch (Exception) { } //Ignore this Console.WriteLine("Finished. Consumed total of " + totalMsgs); } //Handle errors catch (nChannelNotFoundException cnfe) { Console.WriteLine("The channel specified could not be found."); Console.WriteLine("Please ensure that the channel exists in the REALM you connect to."); Console.WriteLine(cnfe.StackTrace); Environment.Exit(1); } catch (nSecurityException se) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Console.WriteLine(se.StackTrace); Environment.Exit(1); } catch (nSessionNotConnectedException snce) { Console.WriteLine("The session object used is not physically connected to the Nirvana Realm."); Console.WriteLine("Please ensure the realm is running and check your RNAME value."); Console.WriteLine(snce.StackTrace); Environment.Exit(1); } catch (nUnexpectedResponseException ure) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Console.WriteLine(ure.StackTrace); Environment.Exit(1); } catch (nUnknownRemoteRealmException urre) { Console.WriteLine("The channel specified resided in a remote realm which could not be found."); Console.WriteLine("Please ensure the channel name specified is correct."); Console.WriteLine(urre.StackTrace); Environment.Exit(1); } catch (nRequestTimedOutException rtoe) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine( "If this is a very busy REALM ask your administrator to increase the client timeout values."); Console.WriteLine(rtoe.StackTrace); Environment.Exit(1); } catch (nNameAlreadyBoundException e) { Console.WriteLine("Durable object has already been bound."); Console.WriteLine(e.StackTrace); Environment.Exit(1); } catch (nBaseClientException nbce) { Console.WriteLine("An error occured while creating the Channel Attributes object."); Console.WriteLine(nbce.StackTrace); Environment.Exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } protected override void processArgs(String[] args) { switch (args.Length) { case 0: usage(); Environment.Exit(0); break; case 1: usage(); Environment.Exit(0); //not all required arguments specified break; case 2: RNAME = args[0]; channelName = args[1]; break; case 3: RNAME = args[0]; channelName = args[1]; subname = args[2]; break; case 4: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); break; case 5: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); logLevel = Convert.ToInt32(args[4]); break; case 6: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); logLevel = Convert.ToInt32(args[4]); reportCount = Convert.ToInt32(args[5]); break; case 7: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); logLevel = Convert.ToInt32(args[4]); reportCount = Convert.ToInt32(args[5]); cluster = args[6].ToLower().Equals("true"); break; case 8: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); logLevel = Convert.ToInt32(args[4]); reportCount = Convert.ToInt32(args[5]); cluster = args[6].ToLower().Equals("true"); persistent = args[7].ToLower().Equals("true"); break; case 9: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); logLevel = Convert.ToInt32(args[4]); reportCount = Convert.ToInt32(args[5]); cluster = args[6].ToLower().Equals("true"); persistent = args[7].ToLower().Equals("true"); selector = args[8]; break; case 10: RNAME = args[0]; channelName = args[1]; subname = args[2]; startEid = Convert.ToInt64(args[3]); logLevel = Convert.ToInt32(args[4]); reportCount = Convert.ToInt32(args[5]); cluster = args[6].ToLower().Equals("true"); persistent = args[7].ToLower().Equals("true"); selector = args[8]; shared = args[9].ToLower().Equals("true"); break; } } public static void Main(String[] args) { //create a new instance for this class mySelf = new namedchanneliterator(); //Process command line arguments subname = Environment.UserName; mySelf.processArgs(args); nSampleApp.processEnvironmentVariables(); //Process the local REALM RNAME details String[] rproperties = new String[4]; rproperties = parseRealmProperties(mySelf.RNAME); //Subscribe to the channel specified mySelf.doit(rproperties, mySelf.channelName); } /** * Process the event * * @param evt An nConsumeEvent object containing the message received from the channel */ public void go(nConsumeEvent evt) { // each event is acknowledged as it is received try { durable.acknowledge(evt.getEventIdentifier(), true); } catch (Exception ex) { Console.WriteLine(ex.StackTrace); } //If this is the first message we receive if (count == -1) { //Get a timestamp to be used for message rate calculations startTime = DateTime.Now.Millisecond; count = 0; } //Increment the counter count++; totalMsgs++; //Have we reached the point where we need to report the rates? if (count == reportCount) { //Reset the counter count = 0; //Get a timestamp to calculate the rates long end = DateTime.Now.Millisecond; // Does the specified log level permit us to print on the screen? if (logLevel >= 1) { //Dump the rates on the screen if (end != startTime) { Console.WriteLine("Received " + reportCount + " in " + (end - startTime) + " Evt/Sec = " + ((reportCount * 1000) / (end - startTime)) + " Bytes/sec=" + ((byteCount * 1000) / (end - startTime))); Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]"); } else { Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter"); } } //Set the startTime for the next report equal to the end timestamp for the previous one startTime = end; //Reset the byte counter byteCount = 0; } //If the last EID counter is not equal to the current event ID if (lastEID != evt.getEventID()) { //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1)); //Reset the last eid counter lastEID = evt.getEventID() + 1; } else { //Increment the last eid counter lastEID++; } //Get the data of the message byte[] buffer = evt.getEventData(); if (buffer != null) { //Add its length to the byte counter byteCount += buffer.Length; } //If the loglevel permits printing on the screen if (logLevel >= 2) { //Print the eid Console.WriteLine("Event id : " + evt.getEventID()); //If the loglevel permits printing on the screen if (logLevel >= 3) { //Print the message tag Console.WriteLine("Event tag : " + evt.getEventTag()); //Print the message data Console.WriteLine("Event data : " + encoding.GetString(evt.getEventData())); if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } } } /** * Thread run loop performing the channel iteration */ public void run() { while (true) { try { //extract the next event from the channel nConsumeEvent evt = iterator.getNext(-1); if (evt != null) { go(evt); } } catch (nRequestTimedOutException) { // no event to consume } catch (Exception) { break; } } } /** * Prints the usage message for this class which gives details of the required arguments etc. */ private static void usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("namedchanneliterator [name] [start eid] [debug] [count] [cluster wide] [persistent] [selector] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " - Channel name parameter for the channel to subscribe to"); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[name] - Specifies the unique name to be used for a durable subscription (default: OS username) "); Console.WriteLine( "[start eid] - The Event ID to start subscribing from if name subscriber is to be created (doesn't already exist)"); Console.WriteLine( "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All"); Console.WriteLine( "[count] - The number of events to wait for before printing out summary information (default: 1000) "); Console.WriteLine( "[cluster wide] - Specifies whether the durable object is to be used across a cluster (default: false) "); Console.WriteLine( "[persistent] - Specifies whether the durable object state is to be stored to disk or held in server memory (default: false) "); Console.WriteLine( "[selector] - The event filter string to use"); Console.WriteLine( "[shared] - Whether the durable subscriber is shared (default: false)."); } } }