/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.IO; using System.Threading; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { /** * Creates a synchronous queue reader and pops the queue */ class qreader : nSampleApp, nEventListener { private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding(); private long timeout; static String selector; private long lastEID; private DateTime startTime; private long byteCount; private int logLevel; private int count = -1; private int totalMsgs; private int reportCount = 1000; private bool isTx; private nQueue myQueue; private nQueueSyncReader reader; private Thread qPopper; private static qreader mySelf; /** * This method demonstrates the Nirvana API calls necessary to create a * synchronous queue reader * * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param aqueueName the queue name to pop * @param selector the pop selector filter * @param startEid the eid to start popping from * @param loglvl the specified log level * @param repCount the specified report count */ private void doit(String[] realmDetails, String aqueueName, long time, int loglvl, int repCount, bool transactional, String sel) { logLevel = loglvl; reportCount = repCount; timeout = time; isTx = transactional; mySelf.constructSession(realmDetails); //Subscribes to the specified queue try { //Create a channel attributes object var nca = new nChannelAttributes(); nca.setName(aqueueName); //Obtain the queue reference myQueue = mySession.findQueue(nca); //output queue details nQueueDetails details = myQueue.getDetails(); Console.WriteLine("Current queue size = " + details.getNoOfEvents()); Console.WriteLine("Current queue age = " + (details.getLastEventTime() - details.getFirstEventTime())); Console.WriteLine("Current storage size = " + details.getTotalMemorySize()); Console.WriteLine("Current readers = " + details.getNoOfReaders()); //create the queue reader if (isTx) { reader = myQueue.createTransactionalReader(new nQueueReaderContext(this, sel)); } else { reader = myQueue.createReader(new nQueueReaderContext(this, sel)); } ThreadStart ts = new ThreadStart(run); qPopper = new Thread(ts); qPopper.IsBackground = true; qPopper.Start(); qPopper.Name = "Q Reader thread"; //Stay subscribed until the user presses any key Console.ReadLine(); Console.WriteLine("Finished. Consumed total of " + totalMsgs); //Destroy the queue reader nQueue.destroyReader(reader); } //Handle errors catch (nChannelNotFoundException) { Console.WriteLine("The queue specified could not be found."); Console.WriteLine("Please ensure that the queue exists in the REALM you connect to."); Environment.Exit(1); } catch (nSecurityException) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Environment.Exit(1); } catch (nSessionNotConnectedException) { Console.WriteLine("The session object used is not physically connected to the Nirvana realm."); Console.WriteLine("Please ensure the realm is up and check your RNAME value."); Environment.Exit(1); } catch (nUnexpectedResponseException) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Environment.Exit(1); } catch (nUnknownRemoteRealmException) { Console.WriteLine("The queue specified resided in a remote realm which could not be found."); Console.WriteLine("Please ensure the queue name specified is correct."); Environment.Exit(1); } catch (nRequestTimedOutException) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values."); Environment.Exit(1); } catch (nBaseClientException) { Console.WriteLine("An error occured while creating the Channel Attributes object."); Environment.Exit(1); } catch (IOException e) { Console.WriteLine(e.StackTrace); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } protected override void processArgs(String[] args) { // // Need a min of 2, rname, channel name if (args.Length < 2) { Usage(); Environment.Exit(2); } String queueName = args[1]; var time = 10000; int loglvl = 1; int reportCount = 1000; bool transactional = false; if (args.Length > 2) { loglvl = Convert.ToInt32(args[2]); } if (args.Length > 3) { time = Convert.ToInt32(args[3]); } if (args.Length > 4) { transactional = Convert.ToBoolean(args[4]); } //Check for a selector message filter value if (args.Length > 5) { selector = args[5]; } if (args.Length > 6) { reportCount = Convert.ToInt32(args[6]); } String RNAME = args[0]; //Process the local REALM RNAME details String[] rproperties = new String[4]; rproperties = parseRealmProperties(RNAME); mySelf.doit(parseRealmProperties(RNAME), queueName, time, loglvl, reportCount, transactional, selector); } public static void Main(String[] args) { //Create an instance for this class mySelf = new qreader(); mySelf.processArgs(args); } /** * This method is invoked from the queue receive thread each time an event is received from * the nirvana channel. * * @param evt An nConsumeEvent object containing the message received from the channel */ public void go(nConsumeEvent evt) { if (isTx) { ((nQueueSyncTransactionReader)reader).commit(evt.getEventID()); } //If this is the first message we receive if (count == -1) { //Get a timestamp to be used for message rate calculations startTime = DateTime.Now; count = 0; } //Increment he counter count++; totalMsgs++; //Have we reached the point where we need to report the rates? if (count == reportCount) { //Reset the counter count = 0; //Get a timestampt to calculate the rates DateTime end = DateTime.Now; // Does the specified log level permits us to print on the screen? if (logLevel >= 1) { //Dump the rates on the screen if (!end.Equals(startTime)) { TimeSpan ts = end - startTime; Console.WriteLine("Received " + reportCount + " in " + (ts.Milliseconds) + " Evt/Sec = " + ((reportCount * 1000) / (ts.Milliseconds)) + " Bytes/sec=" + ((byteCount * 1000) / (ts.Milliseconds))); Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]"); } else { Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter"); } } //Set the startTime for the next report equal to the end timestamp for the previous one startTime = end; //Reset the byte counter byteCount = 0; } //If the last EID counter is not equal to the current event ID if (lastEID != evt.getEventID()) { //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1)); //Reset the last eid counter lastEID = evt.getEventID() + 1; } else { //Increment the last eid counter lastEID++; } //Get the data of the message byte[] buffer = evt.getEventData(); if (buffer != null) { //Add its length to the byte counter byteCount += buffer.Length; } //If the loglevel permits printing on the screen if (logLevel >= 2) { //Print the eid Console.WriteLine("Event id : " + evt.getEventID()); if (evt.isEndOfChannel()) { Console.WriteLine("End of channel reached"); } //If the loglevel permits printing on the screen if (logLevel >= 3) { //Print the message tag Console.WriteLine("Event tag : " + evt.getEventTag()); //Print the message data Console.WriteLine("Event data : " + encoding.GetString(evt.getEventData())); if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } } } public void run() { while (true) { try { nConsumeEvent evt = reader.pop(timeout); if (evt != null) { go(evt); } } catch (Exception e) { Console.WriteLine("Exception in pop....exiting!"); Console.WriteLine(e.StackTrace); break; } } Environment.Exit(1); } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine("qreader [debug] [timeout] [transactional] [selecter] [count] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " - Queue name to pop from"); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All"); Console.WriteLine( "[timeout] - The timeout for the synchronous pop call"); Console.WriteLine( "[transactional] - true / false whether the subscriber is transactional, if true, each event consumed will be ack'd to confirm receipt"); Console.WriteLine( "[selector] - The event filter string to use"); Console.WriteLine( "[count] - The number of events to wait before printing out summary information"); } } }