/* \n Copyright 1999-2011 (c) My-Channels \n Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. \n\n Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.\n */\n #include "nSampleApp.h" #include "nEventListener.h" #include "nEventAttributes.h" #include "nSession.h" #include "nRegisteredEvent.h" #include "nEventProperties.h" #include "nSessionNotConnectedException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nUnexpectedResponseException.h" #include "nIllegalArgumentException.h" #include "nRequestTimedOutException.h" #include "nUnknownRemoteRealmException.h" #include "nChannelNotFoundException.h" #include "nChannelAlreadySubscribedException.h" #include "nIllegalChannelMode.h" #include "nSessionFactory.h" #include "nConsumeEvent.h" #include "nChannel.h" #include "nChannelAttributes.h" #include "nChannelPublishKeys.h" #include "nChannelAlreadyExistsException.h" #include #include "Poco/Thread.h" #include "time.h" #include "sys/timeb.h" #ifdef WIN32 #define _CRTDBG_MAP_ALLOC #include #endif using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; using namespace Poco; class RegisteredEvent : nSampleApp, nEventListener { private: bool m_bOk; nBaseClientException *m_pAsyncException; static RegisteredEvent *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * a channel. * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a string array containing the possible RNAME values * @param nRealmDetail the size of the pRealmDetails array * @param achannelName the name of the channel to publish to * @param count the number of times to commit the registered events */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string achannelName, int count) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); try { nChannelAttributes* nca = new nChannelAttributes(); nChannel* myChannel; try { nca->setName(achannelName); myChannel = m_pSession->findChannel(nca); } catch (nIllegalArgumentException e) { printf("Channel name not recognised."); printf("Exception : %s\n", e.message().c_str()); } catch (nUnknownRemoteRealmException e1) { printf("Realm not recognised."); printf("Exception : %s\n", e1.message().c_str()); } catch (nChannelNotFoundException e2) { nChannelAttributes* cattr; try { cattr = new nChannelAttributes(achannelName, 0, 0, nChannelAttributes::SIMPLE_TYPE); cattr->useMergeEngine(true); nChannelPublishKeys *pks = {new nChannelPublishKeys("ccy", 1)}; cattr->setPublishKeys(&pks,1); myChannel = m_pSession->createChannel(cattr); } catch (nIllegalArgumentException e) { printf("Channel attributes not recongised"); printf("Exception : %s\n", e.message().c_str()); } catch (nUnknownRemoteRealmException e) { printf("Realm could not be found"); printf("Exception : %s\n", e.message().c_str()); } catch (nChannelAlreadyExistsException e) { printf("Channel already exists."); printf("Exception : %s\n", e.message().c_str()); } } // Create a byte array filled with characters equal to // the message size specified. This could be a result // of String.getBytes() call in a real world scenario. // Instantiate the message to be published with the specified // nEventPropeties / byte[] try { myChannel->addSubscriber(this); } catch (nIllegalArgumentException e1) { printf("Invalid channel subscriber."); printf("Exception : %s\n", e1.message().c_str()); } catch (nChannelNotFoundException e1) { printf("Could not find channel."); printf("Exception : %s\n", e1.message().c_str()); } catch (nChannelAlreadySubscribedException e1) { printf("CAlready subscribed to channel"); printf("Exception : %s\n", e1.message().c_str()); } printf("Creating and registering events\n"); nRegisteredEvent* pAud = NULL; nRegisteredEvent* pGbp = NULL; nRegisteredEvent* pEur = NULL; try{ pAud = myChannel->createRegisteredEvent(new nObject(std::string("AUD"))); nEventProperties *pProps = pAud->getProperties(); pProps->put("bid", 0.1111); pProps->put("offer", 0.1112); pProps->put("desc", std::string(std::string("Australian Dollar"))); pProps->put("time", getTimeNow()); pGbp = myChannel->createRegisteredEvent(new nObject(std::string("GBP"))); pProps = pGbp->getProperties(); pProps->put("bid", 0.2222); pProps->put("offer", 0.2223); pProps->put("desc", std::string("English Pound")); pProps->put("time", getTimeNow()); pEur = myChannel->createRegisteredEvent(new nObject(std::string("EUR"))); pProps = pEur->getProperties(); pProps->put("bid", 0.3333); pProps->put("offer", 0.3334); pProps->put("desc", std::string("Euro")); pProps->put("time", getTimeNow()); // The events are now registered with the channel so commit them to the server pAud->commitChanges(true); pGbp->commitChanges(true); pEur->commitChanges(true); } catch(Exception e){ printf("Exception : %s\n", e.message().c_str()); exit(1); } printf("All events are now registered and committed to the server\n"); Thread::sleep(5000); //Loop as many times as the number of messages we want to publish int published = 0; while (published < count) { try { // Both name and bid will be sent to the server as we readd them on each iteration. // The server will see that these properties have not changed so will not send the // events to the subscriptions pAud->getProperties()->put("bid", 0.5555); pAud->getProperties()->put("name", std::string("James")); pAud->commitChanges(); Thread::sleep(2000); // On the first iteration the consumer will receive all properties of the event because // each value has changed. On subsequent iterations, the bid and offer do not change // so the consumer will only receive the 'time' property pGbp->getProperties()->put("offer", 0.6666); pGbp->getProperties()->put("bid", 0.6667); pGbp->getProperties()->put("time", getTimeNow()); pGbp->commitChanges(); Thread::sleep(2000); //Because 'true' is passed to commitChanges, the current event registered on the server //will be overwritten with this event. Therefore the consumer will receive all properties //on each iteration. pEur->getProperties()->put("offer", 0.7777); pEur->getProperties()->put("bid", 0.7778); pEur->getProperties()->put("time", getTimeNow()); pEur->commitChanges(true);//Overwrites the current event on the server Thread::sleep(2000); published++; } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { printf("Disconnected from Nirvana, Sleeping for 1 second...\n"); try { Thread::sleep(1000); } catch (Exception e) { } } } catch (nBaseClientException ex) { printf("Exception : %s\n", ex.message().c_str()); throw ex; } //Check if an asynchronous exception has been received if (!m_bOk) { //If it has, then throw it throw *m_pAsyncException; } } } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory::shutdown(); } public: void go(nConsumeEvent *pEvt) { printf("Channel Name : %s\n", pEvt->getChannelName().c_str()); printf("Event id : %d\n", pEvt->getEventID()); printf("Event tag : %s\n", pEvt->getEventTag().c_str()); if(pEvt->getAttributes()->isDelta()) printf("Is Delta : true\n"); else printf("Is Delta : false\n"); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } printf("\n"); } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname if (argc < 3) { Usage(); exit(1); } std::string RNAME = argv[1]; std::string channame = argv[2]; int count=10; if(argc > 3) count=atoi(argv[3]); // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channame,count); delete[] pRproperties; } public: int getType(){ return 0; } static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new RegisteredEvent(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { printf("Usage ...\n\n"); printf("RegisteredEvent [count] \n\n"); printf(" \n\n"); printf(" - Rname of the server to connect to\n"); printf(" - Channel name parameter for the channel to publish to\n"); printf("\n[Optional Arguments]\n\n"); printf("[count] -The number of events to publish (default: 10)\n"); } std::string getTimeNow() { struct timeb now; ftime (&now); struct tm *pTm = localtime (&now.time); char buffer[256]; sprintf (buffer, "%02d:%02d:%02d.%03d", pTm->tm_hour, pTm->tm_min, pTm->tm_sec, now.millitm); std::string strNow = buffer; return strNow; } RegisteredEvent() : m_bOk(true), m_pAsyncException(NULL) { } }; RegisteredEvent* RegisteredEvent::m_pSelf = NULL; int main (int argc, char** argv) { return RegisteredEvent::Main (argc, argv); }