/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { class multiplex : nSampleApp, nEventListener { static long startEid; static String selector = null; String channelName; int startEID = -1; private long lastEID = 0; private long startTime = 0; private long byteCount = 0; private String rname; private int logLevel = 0; private int count = -1; private int totalMsgs = 0; private int reportCount = 10000; private nChannel myChannel; private static multiplex mySelf = null; public String Selector; private void doit(String[] realmDetails, String achannelName, String selector, long startEid, int loglvl, int repCount) { logLevel = loglvl; reportCount = repCount; nSession otherSession =null; nChannel otherChannel = null; try { nsa = new nSessionAttributes(realmDetails, 2); //Construct first session. mySession = nSessionFactory.create(nsa, this); mySession.init(); //Construct second session by multiplexing the first session. otherSession = nSessionFactory.createMultiplexed(mySession); otherSession.init(); //Connect to a channel using the first session. nChannelAttributes nca = new nChannelAttributes(); nca.setName(achannelName); //Obtain the channel reference myChannel = mySession.findChannel( nca ); otherChannel = otherSession.findChannel(nca); //if the latest event has been implied (by specifying -1) if(startEid == -1){ //Get the last eid on the channel and reset the start eid with that value startEid = myChannel.getLastEID(); } //Add this object as a subscribe to the channel with the specified message selector // and start eid myChannel.addSubscriber( this, selector, startEid ); otherChannel.addSubscriber(this,selector,startEid); //Stay subscribed until the user presses any key Console.WriteLine( "Press any key to quit !" ); Console.Read(); Console.WriteLine( "Finished. Consumed total of "+totalMsgs ); //Remove this subscriber myChannel.removeSubscriber( this ); } //Handle errors catch(nChannelNotFoundException cnfe){ Console.WriteLine("The channel specified could not be found."); Console.WriteLine("Please ensure that the channel exists in the REALM you connect to."); Console.WriteLine(cnfe.StackTrace); Environment.Exit(1); } catch (nSecurityException se) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Console.WriteLine(se.StackTrace); Environment.Exit(1); } catch (nSessionNotConnectedException snce) { Console.WriteLine("The session object used is not physically connected to the Nirvana Realm."); Console.WriteLine("Please ensure the realm is running and check your RNAME value."); Console.WriteLine(snce.StackTrace); Environment.Exit(1); } catch (nUnexpectedResponseException ure) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Console.WriteLine(ure.StackTrace); Environment.Exit(1); } catch (nUnknownRemoteRealmException urre) { Console.WriteLine("The channel specified resided in a remote realm which could not be found."); Console.WriteLine("Please ensure the channel name specified is correct."); Console.WriteLine(urre.StackTrace); Environment.Exit(1); } catch (nRequestTimedOutException rtoe) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values."); Console.WriteLine(rtoe.StackTrace); Environment.Exit(1); } catch (nChannelAlreadySubscribedException chase) { Console.WriteLine("You are already subscribed to this channel."); Console.WriteLine(chase.StackTrace); Environment.Exit(1); } catch (nSelectorParserException spe) { Console.WriteLine("An error occured while parsing the selector filter specified."); Console.WriteLine("Please check the JMS documentation on how to write a valid selector."); Console.WriteLine(spe.StackTrace); Environment.Exit(1); } catch (nBaseClientException nbce) { Console.WriteLine("An error occured while creating the Channel Attributes object."); Console.WriteLine(nbce.StackTrace); Environment.Exit(1); } //Close the session we opened try{ nSessionFactory.close ( mySession ); nSessionFactory.close(otherSession); } catch(Exception ex){} //Close any other sessions within this JVM so that we can exit nSessionFactory.shutdown ( ); } protected override void processArgs(String[] args) { switch (args.Length) { case 6: Selector = args[5]; goto case 5; case 5: count = Convert.ToInt32(args[4]); goto case 4; case 4: logLevel = Convert.ToInt32(args[3]); goto case 3; case 3: startEID = Convert.ToInt32(args[2]); goto case 2; case 2: channelName = args[1]; goto case 1; case 1: if (args[0].Equals("-?")) { Usage(); UsageEnv(); } rname = args[0]; break; } } public static new void Main(String[] args) { //Create an instance for this class mySelf = new multiplex(); //Process command line arguments mySelf.processArgs(args); nSampleApp.processEnvironmentVariables(); //Process the local REALM RNAME details String[] rproperties = new String[4]; rproperties = parseRealmProperties(mySelf.rname); //Subscribe to the channel specified mySelf.doit(rproperties, mySelf.channelName, selector, startEid, mySelf.logLevel, mySelf.count); } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param evt An nConsumeEvent object containing the message received from the channel */ public void go(nConsumeEvent evt) { //If this is the first message we receive if(count == -1){ //Get a timestamp to be used for message rate calculations startTime = nConstants.currentTimeMillis(); count = 0; } //Increment he counter count++; totalMsgs++; //Have we reached the point where we need to report the rates? if(count == reportCount){ //Reset the counter count = 0; //Get a timestampt to calculate the rates long end = nConstants.currentTimeMillis(); // Does the specified log level permits us to print on the screen? if(logLevel >= 1){ //Dump the rates on the screen if(end != startTime){ Console.WriteLine("Received "+reportCount+" in "+(end-startTime)+" Evt/Sec = "+((reportCount * 1000)/(end-startTime))+" Bytes/sec="+((byteCount*1000)/(end-startTime))); Console.WriteLine("Bandwidth data : Bytes Tx ["+mySession.getOutputByteCount()+"] Bytes Rx ["+mySession.getInputByteCount()+"]"); } else{ Console.WriteLine("Received "+reportCount+" faster than the system millisecond counter"); } } //Set the startTime for the next report equal to the end timestamp for the previous one startTime = end; //Reset the byte counter byteCount = 0; } //If the last EID counter is not equal to the current event ID if(lastEID != evt.getEventID()){ //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. Console.WriteLine("Expired event range "+(lastEID)+" - "+(evt.getEventID()-1)); //Reset the last eid counter lastEID = evt.getEventID()+1; } else{ //Increment the last eid counter lastEID++; } //Get the data of the message byte[] buffer = evt.getEventData(); if(buffer != null){ //Add its length to the byte counter byteCount += buffer.Length; } //If the loglevel permits printing on the screen if(logLevel >= 2){ //Print the eid Console.WriteLine( "Event id : " + evt.getEventID() ); if(evt.isEndOfChannel()){ Console.WriteLine("End of channel reached"); } //If the loglevel permits printing on the screen if(logLevel >= 3){ //Print the message tag Console.WriteLine( "Event tag : " + evt.getEventTag() ); //Print the message data Console.WriteLine( "Event data : " + Convert.ToString( evt.getEventData() ) ); if(evt.hasAttributes()){ displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if(prop != null) { displayEventProperties(prop); } } } } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine( "Usage ...\n" ); Console.WriteLine("nsubchan [start eid] [debug] [count] [selector] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - Channel name parameter for the channel to subscribe to" ); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[start eid] - The Event ID to start subscribing from" ); Console.WriteLine( "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All" ); Console.WriteLine( "[count] - The number of events to wait before printing out summary information"); Console.WriteLine( "[selector] - The event filter string to use"); Console.WriteLine( "\n\nNote: -? provides help on environment variables \n"); } } }