/* \n Copyright 1999-2011 (c) My-Channels \n Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. \n\n Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.\n */\n #include "nSampleApp.h" #include "nEventListener.h" #include "nChannelWatcher.h" #include "nChannel.h" #include "nConstants.h" #include "nSession.h" #include "nConsumeEvent.h" #include "nChannelAttributes.h" #include "nSessionFactory.h" #include "nChannelNotFoundException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include #include #ifdef WIN32 #include #endif using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class subscriber : public nSampleApp, public nChannelWatcher { /** * Uses a listener to consume events asynchronously from a nirvana channel. */ private: static subscriber *m_pSelf; static longlong m_startEid; static std::string m_selector; int m_count; unsigned long m_startTime; int m_totalMsgs; int m_reportCount; int m_logLevel; longlong m_byteCount; longlong m_lastEID; nChannel *m_pChannel; /** * This method demonstrates the Nirvana API calls necessary to consume * events from a channel using an asynchronous callback * * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail the length of the rname array * @param channelName the channel name to create * @param selector the subscription filter * @param startEid the eid to start subscribing from * @param loglvl the specified log level * @param repCount the specified report count */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string& channelName, std::string& selector, longlong startEid, int loglvl, int repCount) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); m_logLevel = loglvl; m_reportCount = repCount; try { nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(channelName); // locate the channel m_pChannel = m_pSession->findChannel(pNca); if (m_startEid == -1) { m_startEid = m_pChannel->getLastEID(); } // add the subscriber from the given event id m_pChannel->addSubscriber((nChannelWatcher*)this, selector, startEid); // wait for user input printf("Press any key to quit !\n"); try { char buffer[80]; gets (buffer); } catch (Exception read) { } //Ignore this printf("Finished. Consumed total of %d\n", m_totalMsgs); m_pChannel->removeSubscriber(this); try { // close the session nSessionFactory::close(m_pSession); } catch (Exception ex) { } // close any other sessions in this programs so it can exit nSessionFactory::shutdown(); if (m_pSession) delete m_pSession; if (m_pChannel) { delete m_pChannel; m_pChannel = NULL; } } catch (nChannelNotFoundException cnf) { printf(cnf.message().c_str()); exit(1); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } } int getType () { return fBase::CHANNELWATCHER; } protected: /** * Process the command line args */ void processArgs(int argc, char** argv) { if (argc < 3) { Usage(); exit(2); } std::string channelName = argv[2]; m_startEid = -1; if (argc > 3) m_startEid = atoi (argv[3]); int loglvl = 0; if (argc > 4) loglvl = atoi (argv[4]); int reportCount = 1000; if (argc > 5) { reportCount = atoi(argv[5]); } if (argc > 6) { m_selector = argv[6]; } std::string RNAME = argv[1]; int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channelName, m_selector, m_startEid, loglvl, reportCount); } public: subscriber () : m_count(-1), m_startTime(0), m_totalMsgs(0), m_reportCount(10000), m_logLevel(0), m_byteCount(0), m_lastEID(0), m_pChannel(NULL) { } /** * Prints the usage message for this class */ void Usage() { printf("Usage ...\n"); printf("subscriber [start eid] [debug] [count] [selector] \n"); printf(" \n\n"); printf(" - URL of realm to connect to\n"); printf(" - Channel name parameter for the channel to subscribe to\n"); printf("\n[Optional Arguments] \n\n"); printf("[start eid] - The Event ID to start subscribing from\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[count] - The number of events to wait before printing out summary information\n"); printf("[selector] - The event filter string to use\n"); } static int Main(int argc, char** argv) { m_pSelf = new subscriber(); m_pSelf->processArgs(argc, argv); return 0; } void purge(longlong start, longlong end, std::string& filter) { printf("Received purge for %d to %d filter %s", start, end, filter.c_str()); } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param pEvt An nConsumeEvent object containing the message received from the channel */ void go(nConsumeEvent *pEvt) { if (m_count == -1) { struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } m_count++; m_totalMsgs++; if (m_count == m_reportCount) { m_count = 0; struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; if (m_logLevel >= 1) { if (end != m_startTime) { unsigned long milli = end - m_startTime; printf("Received %d in %d Evt/Sec = %d Bytes/sec = %d\n", m_reportCount, milli, ((m_reportCount * 1000) / milli), ((m_byteCount * 1000) / milli)); printf("Bandwidth data : Bytes Tx %d Bytes Rx [%d]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } else { printf("Received %d faster than the system millisecond counter\n", m_reportCount); } } m_startTime = end; m_byteCount = 0; } if (m_lastEID != pEvt->getEventID()) { printf("Expired event range %d - %d\n", m_lastEID, (pEvt->getEventID() - 1)); m_lastEID = pEvt->getEventID() + 1; } else { m_lastEID++; } int length = pEvt->getEventDataLength(); unsigned char *pBuffer = pEvt->getEventData(); if (pBuffer != NULL) { m_byteCount += length; } if (m_logLevel >= 2) { printf("Event id : %d\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } if (m_logLevel >= 3) { printf("Event tag : %s\n", pEvt->getEventTag().c_str()); unsigned char *pString = pEvt->getEventData(); std::string eventData; nConstants::decode (pString, pEvt->getEventDataLength(), eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } } }; subscriber* subscriber::m_pSelf = NULL; longlong subscriber::m_startEid = 0; std::string subscriber::m_selector; int main (int argc, char** argv) { return subscriber::Main (argc, argv); }