/* \n Copyright 1999-2011 (c) My-Channels \n Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. \n\n Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.\n */\n #include "nSampleApp.h" #include "nConstants.h" #include "nChannel.h" #include "nChannelIterator.h" #include "nChannelAttributes.h" #include "nSession.h" #include "nChannelNotFoundException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nUnknownRemoteRealmException.h" #include "nRequestTimedOutException.h" #include "nSessionFactory.h" #include "nConsumeEvent.h" #include "Poco/Thread.h" #include using namespace com::pcbsys::nirvana::apps; using namespace Poco; class channeliterator : public nSampleApp, public Runnable { /** * Uses a channel iterator to consume events from a nirvana channel. * * This is a synchronous method of event retrieval */ public: channeliterator() : m_lastEID(0), m_startTime(0), m_byteCount(0), m_logLevel(0), m_count(-1), m_totalMsgs(0), m_reportCount(1000), m_pChannel(NULL), m_pIterator(NULL), m_pChannelReader(NULL) { } private: longlong m_lastEID; unsigned long m_startTime; long m_byteCount; int m_logLevel; int m_count; int m_totalMsgs; int m_reportCount; bool m_bCont; nChannel *m_pChannel; nChannelIterator *m_pIterator; Thread *m_pChannelReader; static channeliterator *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to consume * events from a channel using a channel iterator * * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail the length of the rname array * @param channelName the channel name to create * @param sel the subscription filter * @param start the eid to start subscribing from * @param loglvl the specified log level * @param repCount the specified report count */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string& channelName, std::string& sel, long start, int loglvl, int repCount) { m_logLevel = loglvl; m_reportCount = repCount; m_bCont = true; nConstants::setClientLogLevel(1); m_pSelf->constructSession(pRealmDetails, nRealmDetail); //Creates the iterator on specified channel try { //Create a channel attributes object nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(channelName); //Obtain the channel reference m_pChannel = m_pSession->findChannel(pNca); //if the latest event has been implied (by specifying -1) if (start == -1) { //Get the last eid on the channel and reset the start eid with that value start = m_pChannel->getLastEID() + 1; } //create the channel iterator object and the read thread m_pIterator = m_pChannel->createIterator(sel, start); std::string threadName = "IteratorReadThread"; m_pChannelReader = new Thread(threadName); m_pChannelReader->setPriority (Thread::PRIO_LOW); m_pChannelReader->start(*this); //Stay subscribed until the user presses any key printf("Press any key to quit !\n"); char buffer[256]; gets (buffer); printf("Finished. Consumed total of %d\n", m_totalMsgs); m_pIterator->close(); m_bCont = false; delete m_pChannelReader; delete m_pIterator; delete m_pChannel; delete pNca; } //Handle errors catch (nChannelNotFoundException cnfe) { printf("The channel specified could not be found.\n"); printf("Please ensure that the channel exists in the REALM you connect to.\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana Realm.\n"); printf("Please ensure the realm is running and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nUnknownRemoteRealmException urre) { printf("The channel specified resided in a remote realm which could not be found.\n"); printf("Please ensure the channel name specified is correct.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } catch (nBaseClientException nbce) { printf("An error occured while creating the Channel Attributes object.\n"); exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions within this program so that we can exit nSessionFactory::shutdown(); } public: void run() { while (m_bCont) { try { nConsumeEvent *pEvt = m_pIterator->getNext(1000); if (pEvt != NULL) { go(pEvt); } } catch (nRequestTimedOutException timeOut) { // no event to consume } catch (Exception e) { break; } } } /** * Prints the usage message for this class */ private: static void Usage() { printf("Usage ...\n\n"); printf("channeliterator [start eid] [debug] [count] [selector] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf(" - Channel name parameter for the channel to subscribe to\n"); printf("\n[Optional Arguments] \n\n"); printf("[start eid] - The Event ID to start subscribing from\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[count] - The number of events to wait before printing out summary information\n"); printf("[selector] - The event filter string to use\n"); } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname, channel name if(argc < 3) { Usage(); exit(2); } std::string RNAME = argv[1]; std::string channelName = argv[2]; int start = 0; if (argc > 3) { start = atoi(argv[3]); } int loglvl = 3; if (argc > 4) { loglvl = atoi(argv[4]); } // // Optional Parameters // int report = 1000; //Check for a selector message filter value if (argc > 5) { report = atoi(argv[5]); } std::string sel; if (argc > 6) { sel = argv[6]; } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channelName, sel, start, loglvl, report); } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new channeliterator(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Process the event consumed * * @param pEvt An nConsumeEvent object containing the message received from the channel */ public: void go(nConsumeEvent *pEvt) { //If this is the first message we receive if (m_count == -1) { //Get a timestamp to be used for message rate calculations struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } //Increment he counter m_count++; m_totalMsgs++; //Have we reached the point where we need to report the rates? if (m_count == m_reportCount) { //Reset the counter m_count = 0; //Get a timestampt to calculate the rates struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; // Does the specified log level permits us to print on the screen? if (m_logLevel >= 1) { //Dump the rates on the screen if (end != m_startTime) { unsigned long ts = end - m_startTime; printf("Received %d in %d Evt/Sec = %d Bytes/sec=%d\n", m_reportCount, ts, ((m_reportCount * 1000) / ts), ((m_byteCount * 1000) / ts)); printf("Bandwidth data : Bytes Tx [%d] Bytes Rx [%d]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } else { printf("Received %d faster than the system millisecond counter\n", m_reportCount); } } //Set the startTime for the next report equal to the end timestamp for the previous one m_startTime = end; //Reset the byte counter m_byteCount = 0; } //If the last EID counter is not equal to the current event ID if (m_lastEID != pEvt->getEventID()) { //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. printf("Expired event range %d - %d\n", m_lastEID, (pEvt->getEventID() - 1)); //Reset the last eid counter m_lastEID = pEvt->getEventID() + 1; } else { //Increment the last eid counter m_lastEID++; } //Get the data of the message unsigned char *pBuffer = pEvt->getEventData(); int length = pEvt->getEventDataLength(); if (pBuffer != NULL) { //Add its length to the byte counter m_byteCount += length; } //If the loglevel permits printing on the screen if (m_logLevel >= 2) { //Print the eid printf("Event id : %d\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } //If the loglevel permits printing on the screen if (m_logLevel >= 3) { //Print the message tag printf("Event tag : %s\n", pEvt->getEventTag().c_str()); //Print the message data unsigned char *pBuffer = pEvt->getEventData(); int length = pEvt->getEventDataLength(); std::string eventData; nConstants::decode(pBuffer, length, eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } } }; channeliterator* channeliterator::m_pSelf = NULL; int main (int argc, char** argv) { return channeliterator::Main (argc, argv); }