/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nEventListener.h" #include "nChannelAttributes.h" #include "nSubscriptionAttributes.h" #include "nSession.h" #include "nBaseClientException.h" #include "nSessionFactory.h" #include "nConsumeEvent.h" #include "nConstants.h" #include #include using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class sessionSubscriber : public nSampleApp, public nEventListener { private: static sessionSubscriber *m_pSelf; static longlong m_startEid; static std::string m_selector; unsigned long m_startTime; longlong m_lastEID; longlong m_byteCount; int m_logLevel; int m_count; int m_totalMsgs; int m_reportCount; void doit(std::string *pRealmDetails, int nRealmDetail, const std::string& channelNames, const std::string& selector, longlong startEid, int loglvl, int repCount) { m_logLevel = loglvl; m_reportCount = repCount; m_pSelf->constructSession(pRealmDetails, nRealmDetail); try { // get the channels within the specified container within the namespace std::list channelList; int start = 0; while (true) { std::string channelName; int comma = channelNames.find (',', start); if (comma == -1) channelName = channelNames.substr(start); else channelName = channelNames.substr(start, comma - start); channelList.push_back(channelName); if (comma == -1) break; start = comma + 1; } int numChannel = channelList.size(); std::string *pChannels = new std::string[numChannel]; int x = 0; for (std::list::iterator iterator = channelList.begin(); iterator != channelList.end(); iterator++) { pChannels[x++] = *iterator; } // Create the array of subscriber attributes objects nSubscriptionAttributes **ppAttrs = new nSubscriptionAttributes*[numChannel]; for (x = 0; x < numChannel; x++) { ppAttrs[x] = new nSubscriptionAttributes(pChannels[x], selector, startEid, this); } m_pSession->subscribe(ppAttrs, numChannel); std::list channels; for (x = 0; x < numChannel; x++) { if (ppAttrs[x]->wasSuccessful()) { printf("Successfully subscribed to %s\n", ppAttrs[x]->getChannelName().c_str()); channels.push_back(ppAttrs[x]->getChannel()); } else { printf("Failed to subscribe to %s : %s\n", ppAttrs[x]->getChannelName().c_str(), ppAttrs[x]->getException()->message().c_str()); } } // Stay subscribed until the user presses any key printf("Press any key to quit !\n"); std::cin.ignore(); printf("Finished. Consumed total of %d\n", m_totalMsgs); } catch (Exception nbce) { printf("An error occured while creating the Channel Attributes object.\n"); printf("%s\n", nbce.message().c_str()); exit(1); } try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname, channel names if (argc < 3) { Usage(); exit(2); } std::string RNAME = argv[1]; std::string channelName = argv[2]; int start = 0; if (argc > 3) start = atoi (argv[3]); int loglvl = 3; if (argc > 4) loglvl = atoi (argv[4]); int report = 1000; if (argc > 5) report = atoi (argv[5]); std::string sel; if (argc > 6) sel = argv[6]; int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channelName, sel, start, loglvl, report); delete[] pRproperties; } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new sessionSubscriber(); try { //Display the details of channels and queues in all known realm m_pSelf->processArgs(argc, argv); nSessionFactory::shutdown(); } catch (Exception ex) { } return 0; } sessionSubscriber () : m_startTime(0), m_lastEID(0), m_byteCount(0), m_logLevel(0), m_count(-1), m_totalMsgs(0), m_reportCount(10000) { } private: /// /// Prints the usage string for this class /// static void Usage() { printf("Usage ...\n\n"); printf("sessionSubscriber \n\n"); printf(" \n\n"); printf(" - The RNAME of the realm you wish to connect to\n"); printf(" - Comma separated list of channels to subscribe to\n"); printf("\n[Optional Arguments] \n\n"); printf("[start eid] - The Event ID to start subscribing from\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[count] - The number of events to wait before printing out summary information\n"); printf("[selector] - The event filter string to use\n"); } public: /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param evt An nConsumeEvent object containing the message received from the channel */ void go(nConsumeEvent *pEvt) { //If this is the first message we receive if (m_count == -1) { //Get a timestamp to be used for message rate calculations struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } //Increment he counter m_count++; m_totalMsgs++; //Have we reached the point where we need to report the rates? if (m_count == m_reportCount) { //Reset the counter m_count = 0; //Get a timestampt to calculate the rates struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; // Does the specified log level permits us to print on the screen? if (m_logLevel >= 1) { //Dump the rates on the screen if (end != m_startTime) { unsigned long milli = end - m_startTime; printf("Received %d in %lu Evt/Sec = %lu Bytes/sec=%llu\n", m_reportCount, milli, ((m_reportCount * 1000) / (milli)), ((m_byteCount * 1000) / (milli))); printf("Bandwidth data : Bytes Tx [%llu] Bytes Rx [%llu]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } else { printf("Received %d faster than the system millisecond counter\n", m_reportCount); } } //Set the startTime for the next report equal to the end timestamp for the previous one m_startTime = end; //Reset the byte counter m_byteCount = 0; } //If the last EID counter is not equal to the current event ID if (m_lastEID != pEvt->getEventID()) { //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. printf("Expired event range %lld - %lld\n", m_lastEID, (pEvt->getEventID() - 1)); //Reset the last eid counter m_lastEID = pEvt->getEventID() + 1; } else { //Increment the last eid counter m_lastEID++; } //Get the data of the message unsigned char *pBuffer = pEvt->getEventData(); int dataLength = pEvt->getEventDataLength(); if (pBuffer != NULL) { //Add its length to the byte counter m_byteCount += dataLength; } //If the loglevel permits printing on the screen if (m_logLevel >= 2) { //Print the eid printf("Event id : %lld\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } //If the loglevel permits printing on the screen if (m_logLevel >= 3) { //Print the message tag printf("Event tag : %s\n", pEvt->getEventTag().c_str()); //Print the message data std::string eventData; nConstants::decode (pEvt->getEventData(), pEvt->getEventDataLength(), eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } } int getType () { return 0; } }; // End of sessionSubscriber Class sessionSubscriber* sessionSubscriber::m_pSelf = NULL; longlong sessionSubscriber::m_startEid = 0; std::string sessionSubscriber::m_selector; int main (int argc, char** argv) { return sessionSubscriber::Main (argc, argv); }