/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nEventListener.h" #include "nEventAttributes.h" #include "nSession.h" #include "nRegisteredEvent.h" #include "nEventProperties.h" #include "nSessionNotConnectedException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nUnexpectedResponseException.h" #include "nIllegalArgumentException.h" #include "nRequestTimedOutException.h" #include "nUnknownRemoteRealmException.h" #include "nChannelNotFoundException.h" #include "nChannelAlreadySubscribedException.h" #include "nIllegalChannelMode.h" #include "nSessionFactory.h" #include "nConsumeEvent.h" #include "nChannel.h" #include "nChannelAttributes.h" #include "nChannelPublishKeys.h" #include "nChannelAlreadyExistsException.h" #include #include #include "sys/timeb.h" #include #include #include "Poco/Thread.h" #include using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class RegisteredEvent : nSampleApp, nEventListener { private: bool m_bOk; nBaseClientException *m_pAsyncException; static RegisteredEvent *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * a channel. * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a string array containing the possible RNAME values * @param nRealmDetail the size of the pRealmDetails array * @param achannelName the name of the channel to publish to * @param count the number of times to commit the registered events */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string achannelName, int count) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); try { nChannelAttributes* nca = new nChannelAttributes(); nChannel* myChannel; try { nca->setName(achannelName); myChannel = m_pSession->findChannel(nca); } catch (nIllegalArgumentException e) { std::cout << "Channel name not recognised." << "Exception : "<< e.message() <useMergeEngine(true); nChannelPublishKeys *pks = {new nChannelPublishKeys("ccy", 1)}; cattr->setPublishKeys(&pks,1); myChannel = m_pSession->createChannel(cattr); } catch (nIllegalArgumentException e) { std::cout << "Channel attributes not recongised" << "Exception : " << e.message() <addSubscriber(this); } catch (nIllegalArgumentException e1) { std::cout << "Invalid channel subscriber." << "Exception : " << e1.message() <createRegisteredEvent(new nObject(std::string("AUD"))); nEventProperties *pProps = pAud->getProperties(); pProps->put("bid", 0.1111); pProps->put("offer", 0.1112); pProps->put("desc", std::string(std::string("Australian Dollar"))); pProps->put("time", getTimeNow()); pGbp = myChannel->createRegisteredEvent(new nObject(std::string("GBP"))); pProps = pGbp->getProperties(); pProps->put("bid", 0.2222); pProps->put("offer", 0.2223); pProps->put("desc", std::string("English Pound")); pProps->put("time", getTimeNow()); pEur = myChannel->createRegisteredEvent(new nObject(std::string("EUR"))); pProps = pEur->getProperties(); pProps->put("bid", 0.3333); pProps->put("offer", 0.3334); pProps->put("desc", std::string("Euro")); pProps->put("time", getTimeNow()); // The events are now registered with the channel so commit them to the server pAud->commitChanges(true); pGbp->commitChanges(true); pEur->commitChanges(true); } catch(Exception e) { std::cout << "Exception : " << e.message() <getProperties()->put("bid", 0.5555); pAud->getProperties()->put("name", std::string("James")); pAud->commitChanges(); Poco::Thread::sleep(2000); // On the first iteration the consumer will receive all properties of the event because // each value has changed. On subsequent iterations, the bid and offer do not change // so the consumer will only receive the 'time' property pGbp->getProperties()->put("offer", 0.6666); pGbp->getProperties()->put("bid", 0.6667); pGbp->getProperties()->put("time", getTimeNow()); pGbp->commitChanges(); Poco::Thread::sleep(2000); //Because 'true' is passed to commitChanges, the current event registered on the server //will be overwritten with this event. Therefore the consumer will receive all properties //on each iteration. pEur->getProperties()->put("offer", 0.7777); pEur->getProperties()->put("bid", 0.7778); pEur->getProperties()->put("time", getTimeNow()); pEur->commitChanges(true);//Overwrites the current event on the server Poco::Thread::sleep(2000); published++; } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { std::cout << "Disconnected from Nirvana, Sleeping for 1 second..."; Poco::Thread::sleep(1000); } } catch (nBaseClientException ex) { std::cout << "Exception : " << ex.message() << std::endl; throw ex; } //Check if an asynchronous exception has been received if (!m_bOk) { //If it has, then throw it throw *m_pAsyncException; } } } catch (nSessionPausedException ps) { std::cout << "Session has been paused, please resume the session" << std::endl; exit(1); } catch (nSecurityException se) { std::cout << "Unsufficient permissions for the requested operation." << std::endl; std::cout << "Please check the ACL settings on the server." << std::endl; exit(1); } catch (nSessionNotConnectedException snce) { std::cout << "The session object used is not physically connected to the Nirvana realm." << std::endl; std::cout << "Please ensure the realm is up and check your RNAME value." << std::endl; exit(1); } catch (nUnexpectedResponseException ure) { std::cout << "The Nirvana REALM has returned an unexpected response." << std::endl; std::cout << "Please ensure the Nirvana REALM and client API used are compatible." << std::endl; exit(1); } catch (nRequestTimedOutException rtoe) { std::cout << "The requested operation has timed out waiting for a response from the REALM." << std::endl; std::cout << "If this is a very busy REALM ask your administrator to increase the client timeout values." << std::endl; exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory::shutdown(); } public: void go(nConsumeEvent *pEvt) { std::cout << "Channel Name : " << pEvt->getChannelName() <<"\n"; std::cout << "Event id : "<getEventID() << "\n"; std::cout << "Event tag : " << pEvt->getEventTag() << "\n";; if(pEvt->getAttributes()->isDelta()) std::cout << "Is Delta : true\n"; else std::cout << "Is Delta : false\n"; if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } std::cout << std::endl; } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname if (argc < 3) { Usage(); exit(1); } std::string RNAME = argv[1]; std::string channame = argv[2]; int count=10; if(argc > 3) count=atoi(argv[3]); // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channame,count); delete[] pRproperties; } public: int getType() { return 0; } static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new RegisteredEvent(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { std::cout << "Usage ...\n\n" << "RegisteredEvent [count] \n\n" << " \n\n" << " - Rname of the server to connect to\n" << " - Channel name parameter for the channel to publish to\n" << "\n[Optional Arguments]\n\n" << "[count] -The number of events to publish (default: 10)" << std::endl; } std::string getTimeNow() { struct timeb now; ftime (&now); struct tm *pTm = localtime (&now.time); std::stringstream ss; ss << pTm->tm_hour <<":" << pTm->tm_min <<":"<tm_sec <<"."<< now.millitm; return ss.str(); } RegisteredEvent() : m_bOk(true), m_pAsyncException(NULL) { } }; RegisteredEvent* RegisteredEvent::m_pSelf = NULL; int main (int argc, char** argv) { return RegisteredEvent::Main (argc, argv); }