/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ package com.pcbsys.nirvana.apps; import com.pcbsys.nirvana.client.nConsumeEvent; import com.pcbsys.nirvana.client.nDataStreamListener; import com.pcbsys.nirvana.client.nEventProperties; import com.pcbsys.nirvana.client.nSessionFactory; import java.io.*; /** * This sample demonstrates how an nDataStreamListener application can be created. It simply calls the nSession.init * passing in the nDataStreamListener instance * and implements the onMessage callback. */ public class DataStreamListener extends nSampleApp implements nDataStreamListener { private long lastEID = 0; private long byteCount = 0; private long startTime = 0; private int logLevel = 0; private int count = -1; private int totalMsgs = 0; private int reportCount = 10000; private static DataStreamListener mySelf = null; /** * This method demonstrates the Nirvana API calls necessary to consume data as an nDataStreamListener * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param log the specified log level * @param repCount the specified report count */ private void doit(String[] realmDetails, int log, int repCount) throws Exception { try { logLevel = log; reportCount = repCount; mySelf.constructSession(realmDetails, this); // Stay subscribed until the user presses a key System.out.println("Press any key to quit !"); BufferedInputStream bis = new BufferedInputStream(System.in); try { bis.read(); } catch (Exception read) { } // Ignore this System.out.println("Finished. Consumed total of " + totalMsgs); } catch (Exception ex) { System.out.println("Exception raised : " + ex.getMessage()); ex.printStackTrace(); System.exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } // callback for DataStreamListener public void onMessage(nConsumeEvent evt) { // If this is the first message we receive if (count == -1) { // Get a timestamp to be used for message rate calculations startTime = System.currentTimeMillis(); count = 0; } // Increment he counter count++; totalMsgs++; // Have we reached the point where we need to report the rates? if (count == reportCount) { // Reset the counter count = 0; // Get a timestampt to calculate the rates long end = System.currentTimeMillis(); // Does the specified log level permits us to print on the screen? if (logLevel >= 1) { // Dump the rates on the screen if (end != startTime) { System.out.println( "Received " + reportCount + " in " + (end - startTime) + " Evt/Sec = " + ((reportCount * 1000) / (end - startTime)) + " Bytes/sec=" + ((byteCount * 1000) / (end - startTime))); System.out.println("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession .getInputByteCount() + "]"); } else { System.out.println("Received " + reportCount + " faster than the system millisecond counter"); } } // Set the startTime for the next report equal to the end timestamp // for the previous one startTime = end; // Reset the byte counter byteCount = 0; } // If the last EID counter is not equal to the current event ID if (lastEID != evt.getEventID()) { // If yes, maybe we have missed an event, so print a message on the // screen. // This message could be printed for a number of other reasons. // One of them would be someone purging a range creating an 'eid // gap'. // As eids are never reused within a channel you could have a // situation // where this gets printed but nothing is missed. System.out.println("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1)); // Reset the last eid counter lastEID = evt.getEventID() + 1; } else { // Increment the last eid counter lastEID++; } // Get the data of the message byte[] buffer = evt.getEventData(); if (buffer != null) { // Add its length to the byte counter byteCount += buffer.length; } // If the loglevel permits printing on the screen if (logLevel >= 2) { // Print the eid System.out.println("Event id : " + evt.getEventID()); if (evt.isEndOfChannel()) { System.out.println("End of channel reached"); } // If the loglevel permits printing on the screen if (logLevel >= 3) { // Print the message tag System.out.println("Event tag : " + evt.getEventTag()); // Print the message data System.out.println("Event data : " + new String(evt.getEventData())); if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } } } protected void processArgs(String[] args) { try { // Process Environment Variables nSampleApp.processEnvironmentVariables(); // Need a min of 1, group name int loglevel = 1; if (args.length > 0) { loglevel = Integer.parseInt(args[0]); } int count = 1000; if (args.length > 1) { count = Integer.parseInt(args[1]); } // Check the local realm details String RNAME = null; if (System.getProperty("RNAME") != null) { RNAME = System.getProperty("RNAME"); } else { Usage(); System.exit(1); } // // Run the sample app // mySelf.doit(parseRealmProperties(RNAME), loglevel, count); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { //Create an instance for this class mySelf = new DataStreamListener(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { System.out.println("Usage ...\n"); System.out.println("DataStreamListener [debug] [count] \n"); System.out.println(" \n"); System.out.println("\n[Optional Arguments] \n"); System.out.println("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All"); System.out.println("[count] - The number of events to wait before printing out summary information"); } }