/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nEventListener.h" #include "nChannelWatcher.h" #include "nChannel.h" #include "nConstants.h" #include "nSession.h" #include "nConsumeEvent.h" #include "nChannelAttributes.h" #include "nSessionFactory.h" #include "nChannelNotFoundException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include #include #include using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class subscriber : public nSampleApp, public nChannelWatcher { /** * Uses a listener to consume events asynchronously from a nirvana channel. */ private: static subscriber *m_pSelf; static longlong m_startEid; static std::string m_selector; int m_count; unsigned long m_startTime; int m_totalMsgs; int m_reportCount; int m_logLevel; longlong m_byteCount; longlong m_lastEID; nChannel *m_pChannel; /** * This method demonstrates the Nirvana API calls necessary to consume * events from a channel using an asynchronous callback * * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail the length of the rname array * @param channelName the channel name to create * @param selector the subscription filter * @param startEid the eid to start subscribing from * @param loglvl the specified log level * @param repCount the specified report count */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string& channelName, std::string& selector, longlong startEid, int loglvl, int repCount) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); m_logLevel = loglvl; m_reportCount = repCount; try { nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(channelName); // locate the channel m_pChannel = m_pSession->findChannel(pNca); if (m_startEid == -1) { m_startEid = m_pChannel->getLastEID(); } // add the subscriber from the given event id m_pChannel->addSubscriber((nChannelWatcher*)this, selector, startEid); // wait for user input printf("Press any key to quit !\n"); std::cin.ignore(); printf("Finished. Consumed total of %d\n", m_totalMsgs); m_pChannel->removeSubscriber(this); try { // close the session nSessionFactory::close(m_pSession); } catch (Exception ex) { } // close any other sessions in this programs so it can exit nSessionFactory::shutdown(); if (m_pChannel) { delete m_pChannel; m_pChannel = NULL; } } catch (nChannelNotFoundException cnf) { printf(cnf.message().c_str()); exit(1); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } } int getType () { return fBase::CHANNELWATCHER; } protected: /** * Process the command line args */ void processArgs(int argc, char** argv) { if (argc < 3) { Usage(); exit(2); } std::string channelName = argv[2]; m_startEid = -1; if (argc > 3) m_startEid = atoi (argv[3]); int loglvl = 0; if (argc > 4) loglvl = atoi (argv[4]); int reportCount = 1000; if (argc > 5) { reportCount = atoi(argv[5]); } if (argc > 6) { m_selector = argv[6]; } std::string RNAME = argv[1]; int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channelName, m_selector, m_startEid, loglvl, reportCount); } public: subscriber () : m_count(-1), m_startTime(0), m_totalMsgs(0), m_reportCount(10000), m_logLevel(0), m_byteCount(0), m_lastEID(0), m_pChannel(NULL) { } /** * Prints the usage message for this class */ void Usage() { printf("Usage ...\n"); printf("subscriber [start eid] [debug] [count] [selector] \n"); printf(" \n\n"); printf(" - URL of realm to connect to\n"); printf(" - Channel name parameter for the channel to subscribe to\n"); printf("\n[Optional Arguments] \n\n"); printf("[start eid] - The Event ID to start subscribing from\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[count] - The number of events to wait before printing out summary information\n"); printf("[selector] - The event filter string to use\n"); } static int Main(int argc, char** argv) { m_pSelf = new subscriber(); m_pSelf->processArgs(argc, argv); return 0; } void purge(longlong start, longlong end, std::string& filter) { printf("Received purge for %lld to %lld filter %s", start, end, filter.c_str()); } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param pEvt An nConsumeEvent object containing the message received from the channel */ void go(nConsumeEvent *pEvt) { if (m_count == -1) { struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } m_count++; m_totalMsgs++; if (m_count == m_reportCount) { m_count = 0; struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; if (m_logLevel >= 1) { if (end != m_startTime) { unsigned long milli = end - m_startTime; std::string inScale = ""; longlong byteInCount = m_pSession->getInputByteCount(); if(byteInCount > 10000000000) { byteInCount = byteInCount / 1000000000; inScale = "GB"; } else if(byteInCount > 10000000) { byteInCount = byteInCount / 1000000; inScale = "MB"; } else if(byteInCount > 10000) { inScale = "KB"; byteInCount = byteInCount/1000; } std::string outScale = ""; longlong byteOutCount = m_pSession->getOutputByteCount(); if(byteOutCount > 10000000000) { byteOutCount = byteInCount / 1000000000; outScale = "GB"; } else if(byteOutCount > 10000000) { byteOutCount = byteOutCount / 1000000; outScale = "MB"; } else if(byteOutCount > 10000) { outScale = "KB"; byteOutCount = byteOutCount/1000; } std::cout <<"Received "<getEventID()) { printf("Expired event range %lld - %lld\n", m_lastEID, (pEvt->getEventID() - 1)); m_lastEID = pEvt->getEventID() + 1; } else { m_lastEID++; } int length = pEvt->getEventDataLength(); unsigned char *pBuffer = pEvt->getEventData(); if (pBuffer != NULL) { m_byteCount += length; } if (m_logLevel >= 2) { printf("Event id : %lld\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } if (m_logLevel >= 3) { printf("Event tag : %s\n", pEvt->getEventTag().c_str()); unsigned char *pString = pEvt->getEventData(); std::string eventData; nConstants::decode (pString, pEvt->getEventDataLength(), eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } } }; subscriber* subscriber::m_pSelf = NULL; longlong subscriber::m_startEid = 0; std::string subscriber::m_selector; int main (int argc, char** argv) { return subscriber::Main (argc, argv); }