/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nEventListener.h" #include "nSampleApp.h" #include "nConstants.h" #include "nChannel.h" #include "nNamedObject.h" #include "nSession.h" #include "nConsumeEvent.h" #include "nChannelAttributes.h" #include "nSessionFactory.h" #include "nChannelNotFoundException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include #include #include using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class namedsubscriber : public nSampleApp, public nEventListener { /** * Uses a listener to consume events asynchronously from a nirvana channel, using a named object. */ private: static namedsubscriber *m_pSelf; static longlong m_startEid; static std::string m_selector; bool m_persistent; bool m_autoAck; int m_count; unsigned long m_startTime; int m_totalMsgs; int m_reportCount; int m_logLevel; longlong m_byteCount; longlong m_lastEID; nChannel *m_pChannel; /** * This method demonstrates the Nirvana API calls necessary to consume * events from a channel using an asynchronous callback and a named object * * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail the length of the rname array * @param channelName the channel name to create * @param name the named object unique name * @param persist whether the named object is persistent * @param auto whether the message acknowledgment is automatic * @param selector the subscription filter * @param startEid the eid to start subscribing from * @param loglvl the specified log level * @param repCount the specified report count */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string& channelName, std::string& namedobj, std::string& selector, longlong startEid, int loglvl, int repCount) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); m_logLevel = loglvl; m_reportCount = repCount; try { nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(channelName); // locate the channel m_pChannel = m_pSession->findChannel(pNca); if (m_startEid == -1) { m_startEid = m_pChannel->getLastEID(); } nNamedObject *named = m_pChannel->createNamedObject(namedobj, m_startEid, m_persistent, false); // add the subscriber from the given event id m_pChannel->addSubscriber(this, named, selector, m_autoAck); // wait for user input printf("Press any key to quit !\n"); std::cin.ignore(); printf("Finished. Consumed total of %d\n", m_totalMsgs); m_pChannel->removeSubscriber(this); try { // close the session nSessionFactory::close(m_pSession); } catch (Exception ex) { } // close any other sessions in this programs so it can exit nSessionFactory::shutdown(); if (m_pChannel) { delete m_pChannel; m_pChannel = NULL; } } catch (nChannelNotFoundException cnf) { printf(cnf.message().c_str()); exit(1); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } } int getType () { return 0; } protected: /** * Process the command line args */ void processArgs(int argc, char** argv) { if (argc < 4) { Usage(); exit(2); } std::string channelName = argv[2]; std::string objectName = argv[3]; m_startEid = -1; if (argc > 4) { m_persistent = !strcmp(argv[4], "true"); } if (argc > 5) { m_autoAck = !strcmp(argv[5], "true"); } if (argc > 6) m_startEid = atoi (argv[6]); int loglvl = 0; if (argc > 7) loglvl = atoi (argv[7]); int reportCount = 1000; if (argc > 8) { reportCount = atoi(argv[8]); } if (argc > 9) { m_selector = argv[9]; } std::string RNAME = argv[1]; int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channelName, objectName, m_selector, m_startEid, loglvl, reportCount); } public: namedsubscriber () : m_persistent(true), m_autoAck(true), m_count(-1), m_startTime(0), m_totalMsgs(0), m_reportCount(10000), m_logLevel(0), m_byteCount(0), m_lastEID(0), m_pChannel(NULL) { } /** * Prints the usage message for this class */ void Usage() { printf("Usage ...\n"); printf("namedsubscriber [persist] [auto] [start eid] [debug] [count] [selector] \n"); printf(" \n\n"); printf(" - the rname of server to connect to\n"); printf(" - Channel name parameter for the channel to subscribe to\n"); printf(" - Unique id of the named object\n"); printf("\n[Optional Arguments] \n\n"); printf("[persist] - If the named object will be stored persistently\n"); printf("[auto] - If messages are acknowledged auomatically or manually\n"); printf("[start eid] - The Event ID to start subscribing from\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[count] - The number of events to wait before printing out summary information\n"); printf("[selector] - The event filter string to use\n"); } static int Main(int argc, char** argv) { m_pSelf = new namedsubscriber(); m_pSelf->processArgs(argc, argv); return 0; } /** * A callback is received by the API to this method each time an event is received from * the nirvana channel. Be carefull not to spend too much time processing the message * inside this method, as until it exits the next message can not be pushed. * * @param pEvt An nConsumeEvent object containing the message received from the channel */ void go(nConsumeEvent *pEvt) { if (!m_autoAck) { pEvt->ack(); } if (m_count == -1) { struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } m_count++; m_totalMsgs++; if (m_count == m_reportCount) { m_count = 0; struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; if (m_logLevel >= 1) { if (end != m_startTime) { unsigned long milli = end - m_startTime; printf("Received %d in %lu Evt/Sec = %lu Bytes/sec = %llu\n", m_reportCount, milli, ((m_reportCount * 1000) / milli), ((m_byteCount * 1000) / milli)); printf("Bandwidth data : Bytes Tx %llu Bytes Rx [%llu]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } else { printf("Received %d faster than the system millisecond counter\n", m_reportCount); } } m_startTime = end; m_byteCount = 0; } if (m_lastEID != pEvt->getEventID()) { printf("Expired event range %lld - %lld\n", m_lastEID, (pEvt->getEventID() - 1)); m_lastEID = pEvt->getEventID() + 1; } else { m_lastEID++; } int length = pEvt->getEventDataLength(); unsigned char *pBuffer = pEvt->getEventData(); if (pBuffer != NULL) { m_byteCount += length; } if (m_logLevel >= 2) { printf("Event id : %lld\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } if (m_logLevel >= 3) { printf("Event tag : %s\n", pEvt->getEventTag().c_str()); std::string eventData; nConstants::decode (pEvt->getEventData(), pEvt->getEventDataLength(), eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } } }; namedsubscriber* namedsubscriber::m_pSelf = NULL; longlong namedsubscriber::m_startEid = 0; std::string namedsubscriber::m_selector; int main (int argc, char** argv) { return namedsubscriber::Main (argc, argv); }