/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ package com.pcbsys.nirvana.apps; import com.pcbsys.nirvana.client.*; import java.io.BufferedInputStream; /** * Synchronously consumes events from a nirvana channel using a channel iterator * and an nNamedObject */ public class namedChannelIterator extends nSampleApp implements nEventListener, Runnable { static long startEid; static String selector = null; static String subname = null; static boolean cluster = false; static boolean persistent = false; private long lastEID = 0; private long startTime = 0; private long byteCount = 0; private int count = -1; private int totalMsgs = 0; private int logLevel = 0; private int reportCount = 1000; private nChannelIterator iterator = null; private Thread channelReader = null; private nChannel myChannel; private static namedChannelIterator mySelf = null; private nDurable named = null; /** * This method contains the Nirvana calls necessary to implement a channel * iterator as a named subscriber. * * @param realmDetails An array of RNAME values * @param achannelName The name of the channel to connect to * @param loglvl The level of detail that the log message should contain * @param repCount The number of events to wait for before printing out summary * information */ public void doit(String[] realmDetails, String achannelName, int loglvl, int repCount) { logLevel = loglvl; reportCount = repCount; mySelf.constructSession(realmDetails); // Subscribes to the specified channel try { // Create a channel attributes object nChannelAttributes nca = new nChannelAttributes(); nca.setName(achannelName); // Obtain the channel reference myChannel = mySession.findChannel(nca); // Create a named object nDurableAttributes nda = nDurableAttributes.create(nDurableAttributes.nDurableType.Shared, subname); nda.setStartEID(startEid); nda.setPersistent(persistent); nda.setClustered(cluster); named = myChannel.getDurableManager().add(nda); // create the channel iterator object and the read thread iterator = myChannel.createIterator(named, selector); channelReader = new Thread(this); channelReader.setDaemon(true); channelReader.start(); // Stay subscribed until the user presses any key System.out.println("Press any key to quit !"); BufferedInputStream bis = new BufferedInputStream(System.in); try { bis.read(); } catch (Exception read) { } // Ignore this System.out.println("Finished. Consumed total of " + totalMsgs); } // Handle errors catch (nChannelNotFoundException cnfe) { System.out.println("The channel specified could not be found."); System.out.println("Please ensure that the channel exists in the REALM you connect to."); cnfe.printStackTrace(); System.exit(1); } catch (nSecurityException se) { System.out.println("Insufficient permissions for the requested operation."); System.out.println("Please check the ACL settings on the server."); se.printStackTrace(); System.exit(1); } catch (nSessionNotConnectedException snce) { System.out.println("The session object used is not physically connected to the Nirvana Realm."); System.out.println("Please ensure the realm is running and check your RNAME value."); snce.printStackTrace(); System.exit(1); } catch (nUnexpectedResponseException ure) { System.out.println("The Nirvana REALM has returned an unexpected response."); System.out.println("Please ensure the Nirvana REALM and client API used are compatible."); ure.printStackTrace(); System.exit(1); } catch (nUnknownRemoteRealmException urre) { System.out.println("The channel specified resided in a remote realm which could not be found."); System.out.println("Please ensure the channel name specified is correct."); urre.printStackTrace(); System.exit(1); } catch (nRequestTimedOutException rtoe) { System.out.println("The requested operation has timed out waiting for a response from the REALM."); System.out.println("If this is a very busy REALM ask your administrator to increase the client timeout values."); rtoe.printStackTrace(); System.exit(1); } catch (nBaseClientException nbce) { System.out.println("An error occured while creating the Channel Attributes object."); nbce.printStackTrace(); System.exit(1); } // Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception e) { } // Close any other sessions within this JVM so that we can exit nSessionFactory.shutdown(); } protected void processArgs(String[] args) { switch (args.length) { case 8: System.getProperties().put("SELECTOR", args[7]); case 7: System.getProperties().put("PERSIST", args[6]); case 6: System.getProperties().put("CLUSTER", args[5]); case 5: System.getProperties().put("COUNT", args[4]); case 4: System.getProperties().put("DEBUG", args[3]); case 3: System.getProperties().put("START", args[2]); case 2: System.getProperties().put("NAME", args[1]); case 1: if (args[0].equals("-?")) { usage(); UsageEnv(); } System.getProperties().put("CHANNAME", args[0]); } } public static void main(String[] args) { // create a new instance for this class mySelf = new namedChannelIterator(); // Process command line arguments mySelf.processArgs(args); nSampleApp.processEnvironmentVariables(); String channelName = null; if (System.getProperty("CHANNAME") != null) { channelName = System.getProperty("CHANNAME"); } else { usage(); System.exit(1); } startEid = -1; // Default value (points to last event in the channel + // 1) // Check to see if a start EID value has been specified if (System.getProperty("START") != null) { try { startEid = Integer.parseInt(System.getProperty("START")); } catch (Exception num) { } // Ignore and use the defaults } int loglvl = 1; // Default value // Check to see if a LOGLEVEL value has been specified. if (System.getProperty("DEBUG") != null) { try { loglvl = Integer.parseInt(System.getProperty("DEBUG")); } catch (Exception num) { } // Ignore and use the default } int reportCount = 1000; // Default value // Check to see if a value for report count has been specified. Every // time // N events get received where N = report count, a subscription rate // report will // be printed in System.out if (System.getProperty("COUNT") != null) { try { reportCount = Integer.parseInt(System.getProperty("COUNT")); } catch (Exception num) { } // Ignore and use the default } // Check for a selector message filter value selector = System.getProperty("SELECTOR"); if (System.getProperty("NAME") != null) { subname = System.getProperty("NAME"); } else { subname = System.getProperty("user.name"); } // get whether the named object is cluster wide if (System.getProperty("CLUSTER") != null) { String sCluster = System.getProperty("CLUSTER"); if (sCluster.equalsIgnoreCase("true")) { cluster = true; } } // get whether the named object is cluster wide if (System.getProperty("PERSIST") != null) { String sPersist = System.getProperty("PERSIST"); if (sPersist.equalsIgnoreCase("true")) { persistent = true; } } // Check the local realm details String RNAME = null; if (System.getProperty("RNAME") != null) { RNAME = System.getProperty("RNAME"); } else { usage(); System.exit(1); } // Process the local REALM RNAME details String[] rproperties = new String[4]; rproperties = parseRealmProperties(RNAME); // Subscribe to the channel specified mySelf.doit(rproperties, channelName, loglvl, reportCount); } /** * Process the event * * @param evt An nConsumeEvent object containing the message received from * the channel */ public void go(nConsumeEvent evt) { // each event is acknowledged as it is received try { evt.ack(); } catch (Exception ex) { ex.printStackTrace(); } // If this is the first message we receive if (count == -1) { // Get a timestamp to be used for message rate calculations startTime = System.currentTimeMillis(); count = 0; } // Increment the counter count++; totalMsgs++; // Have we reached the point where we need to report the rates? if (count == reportCount) { // Reset the counter count = 0; // Get a timestamp to calculate the rates long end = System.currentTimeMillis(); // Does the specified log level permit us to print on the screen? if (logLevel >= 1) { // Dump the rates on the screen if (end != startTime) { System.out.println( "Received " + reportCount + " in " + (end - startTime) + " Evt/Sec = " + ((reportCount * 1000) / (end - startTime)) + " Bytes/sec=" + ((byteCount * 1000) / (end - startTime))); System.out.println("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession .getInputByteCount() + "]"); } else { System.out.println("Received " + reportCount + " faster than the system millisecond counter"); } } // Set the startTime for the next report equal to the end timestamp // for the previous one startTime = end; // Reset the byte counter byteCount = 0; } // If the last EID counter is not equal to the current event ID if (lastEID != evt.getEventID()) { // If yes, maybe we have missed an event, so print a message on the // screen. // This message could be printed for a number of other reasons. // One of them would be someone purging a range creating an 'eid // gap'. // As eids are never reused within a channel you could have a // situation // where this gets printed but nothing is missed. System.out.println("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1)); // Reset the last eid counter lastEID = evt.getEventID() + 1; } else { // Increment the last eid counter lastEID++; } // Get the data of the message byte[] buffer = evt.getEventData(); if (buffer != null) { // Add its length to the byte counter byteCount += buffer.length; } // If the loglevel permits printing on the screen if (logLevel >= 2) { // Print the eid System.out.println("Event id : " + evt.getEventID()); // If the loglevel permits printing on the screen if (logLevel >= 3) { // Print the message tag System.out.println("Event tag : " + evt.getEventTag()); // Print the message data System.out.println("Event data : " + new String(evt.getEventData())); if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } } } /** * Thread run loop performing the channel iteration */ public void run() { while (true) { try { // extract the next event from the channel nConsumeEvent evt = iterator.getNext(-1); if (evt != null) { go(evt); } } catch (nRequestTimedOutException timeOut) { // no event to consume } catch (Exception e) { break; } } } /** * Prints the usage message for this class which gives details of the * required arguments etc. */ private static void usage() { System.out.println("Usage ...\n"); System.out.println( "nnamediterator [name] [start eid] [debug] [count] [cluster wide] [persistent] [selector] \n"); System.out.println(" \n"); System.out.println(" - Channel name parameter for the channel to subscribe to"); System.out.println("\n[Optional Arguments] \n"); System.out .println("[name] - Specifies the unique name to be used for a named subscription (default: OS username) "); System.out.println( "[start eid] - The Event ID to start subscribing from if name subscriber is to be created (doesn't already exist)"); System.out.println("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All"); System.out .println("[count] - The number of events to wait for before printing out summary information (default: 1000) "); System.out.println( "[cluster wide] - Specifies whether the named object is to be used across a cluster (default: false) "); System.out.println( "[persistent] - Specifies whether the named object state is to be stored to disk or held in server memory (default: false) "); System.out.println("[selector] - The event filter string to use"); System.out.println("\n\nNote: -? provides help on environment variables \n"); } }// End of namedChannelIterator class