/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nDataStreamListener.h" #include "nSessionFactory.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include "nConsumeEvent.h" #include "nConstants.h" #include "nSession.h" #include #include using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class DataStreamListener : public nSampleApp, public nDataStreamListener { private: unsigned long m_startTime; longlong m_byteCount; int m_logLevel; int m_count; int m_totalMsgs; int m_reportCount; static DataStreamListener *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to consume data from nDataGroup objects * * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param loglvl the specified log level * @param repCount the specified report count */ void doit(std::string *pRealmDetails, int nRealmDetail, int loglvl, int repCount) { m_pSelf->constructSession(pRealmDetails, nRealmDetail, this); m_logLevel = loglvl; m_reportCount = repCount; try { //Stay subscribed until the user presses any key std::cout << "Press any key to quit !" << std::endl;; std::cin.ignore(); std::cout << "Finished. Consumed total of " << m_totalMsgs < 2) { loglvl = atoi(argv[2]); } // // Optional Parameters // int report = 1000; //Check for a selector message filter value if (argc > 3) { report = atoi(argv[3]); } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, loglvl, report); } public: DataStreamListener () : m_startTime(0), m_byteCount(0), m_logLevel(0), m_count(-1), m_totalMsgs(0), m_reportCount(10000) { } static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new DataStreamListener(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } private: /** * Prints the usage message for this class */ static void Usage() { std::cout << "Usage ...\n\n" << "DataStreamListener [debug] [count] \n\n" << " \n\n" << " - the rname of the server to connect to\n\n" << "\n[Optional Arguments] \n\n" << "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n" << "[count] - The number of events to wait before printing out summary information" << std::endl; } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param evt An nConsumeEvent object containing the message received from the channel */ public: void onMessage(nConsumeEvent *pEvt) { //If this is the first message we receive if (m_count == -1) { //Get a timestamp to be used for message rate calculations struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } //Increment he counter m_count++; m_totalMsgs++; //Have we reached the point where we need to report the rates? if (m_count == m_reportCount) { //Reset the counter m_count = 0; //Get a timestampt to calculate the rates struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; // Does the specified log level permits us to print on the screen? if (m_logLevel >= 1) { //Dump the rates on the screen if (end != m_startTime) { unsigned long milli = end - m_startTime; std::cout << "Received "<getOutputByteCount() <<" Bytes Rx ["<< m_pSession->getInputByteCount() <<"]" <getEventDataLength(); unsigned char *pBuffer = pEvt->getEventData(); if (pBuffer != NULL) { m_byteCount += length; } //If the loglevel permits printing on the screen if (m_logLevel >= 2) { //Print the eid std::cout << "Event id : " << pEvt->getEventID() <isEndOfChannel()) { std::cout << "End of channel reached" << std::endl; } //If the loglevel permits printing on the screen if (m_logLevel >= 3) { //Print the message tag std::cout << "Event tag : "<< pEvt->getEventTag() <<"\n"; //Print the message data std::cout << "Event data : "<< pEvt->getEventData()<< std::endl; if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } pEvt->delRef(); } }; DataStreamListener* DataStreamListener::m_pSelf = NULL; int main (int argc, char** argv) { return DataStreamListener::Main (argc, argv); }