/* \n Copyright 1999-2011 (c) My-Channels \n Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. \n\n Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.\n */\n #include "nSampleApp.h" #include "nDataStreamListener.h" #include "nSessionFactory.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include "nConsumeEvent.h" #include "nConstants.h" #include "nSession.h" #include "Poco/Exception.h" #include #ifdef WIN32 #include #endif using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; using namespace Poco; class DataStreamListener : public nSampleApp, public nDataStreamListener { private: unsigned long m_startTime; longlong m_byteCount; int m_logLevel; int m_count; int m_totalMsgs; int m_reportCount; static DataStreamListener *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to consume data from nDataGroup objects * * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param loglvl the specified log level * @param repCount the specified report count */ void doit(std::string *pRealmDetails, int nRealmDetail, int loglvl, int repCount) { m_pSelf->constructSession(pRealmDetails, nRealmDetail, this); m_logLevel = loglvl; m_reportCount = repCount; try { //Stay subscribed until the user presses any key printf("Press any key to quit !\n"); try { char buffer[80]; gets (buffer); } catch (Exception read) { } //Ignore this printf("Finished. Consumed total of %d\n", m_totalMsgs); //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory::shutdown(); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Insufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname if (argc < 2) { Usage(); exit(2); } std::string RNAME = argv[1]; int loglvl = 3; if (argc > 2) { loglvl = atoi(argv[2]); } // // Optional Parameters // int report = 1000; //Check for a selector message filter value if (argc > 3) { report = atoi(argv[3]); } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, loglvl, report); } public: DataStreamListener () : m_startTime(0), m_byteCount(0), m_logLevel(0), m_count(-1), m_totalMsgs(0), m_reportCount(10000) { } static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new DataStreamListener(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } private: /** * Prints the usage message for this class */ static void Usage() { printf("Usage ...\n\n"); printf("DataStreamListener [debug] [count] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf("\n[Optional Arguments] \n\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[count] - The number of events to wait before printing out summary information\n"); } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param evt An nConsumeEvent object containing the message received from the channel */ public: void onMessage(nConsumeEvent *pEvt) { //If this is the first message we receive if (m_count == -1) { //Get a timestamp to be used for message rate calculations struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } //Increment he counter m_count++; m_totalMsgs++; //Have we reached the point where we need to report the rates? if (m_count == m_reportCount) { //Reset the counter m_count = 0; //Get a timestampt to calculate the rates struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; // Does the specified log level permits us to print on the screen? if (m_logLevel >= 1) { //Dump the rates on the screen if (end != m_startTime) { unsigned long milli = end - m_startTime; printf("Received %d in %d Evt/Sec = %d Bytes/sec = %d\n", m_reportCount, milli, ((m_reportCount * 1000) / milli), ((m_byteCount * 1000) / milli)); printf("Bandwidth data : Bytes Tx %d Bytes Rx [%d]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } else { printf("Received %d faster than the system millisecond counter\n", m_reportCount); } } //Set the startTime for the next report equal to the end timestamp for the previous one m_startTime = end; //Reset the byte counter m_byteCount = 0; } //Get the data of the message int length = pEvt->getEventDataLength(); unsigned char *pBuffer = pEvt->getEventData(); if (pBuffer != NULL) { m_byteCount += length; } //If the loglevel permits printing on the screen if (m_logLevel >= 2) { //Print the eid printf("Event id : %d\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } //If the loglevel permits printing on the screen if (m_logLevel >= 3) { //Print the message tag printf("Event tag : %s\n", pEvt->getEventTag().c_str()); //Print the message data unsigned char *pString = pEvt->getEventData(); std::string eventData; nConstants::decode (pString, pEvt->getEventDataLength(), eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } pEvt->delRef(); } }; DataStreamListener* DataStreamListener::m_pSelf = NULL; int main (int argc, char** argv) { return DataStreamListener::Main (argc, argv); }