/* \n Copyright 1999-2011 (c) My-Channels \n Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. \n\n Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.\n */\n #include "nSampleApp.h" #include "nDataStreamListener.h" #include "nEventAttributes.h" #include "nDataGroup.h" #include "nSession.h" #include "nRegisteredEvent.h" #include "nEventProperties.h" #include "nSessionNotConnectedException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include "nSessionFactory.h" #include "nConsumeEvent.h" #include "Poco/Thread.h" #include "time.h" #include "sys/timeb.h" #ifdef WIN32 #include #endif using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; using namespace Poco; class DataGroupDeltaDelivery : nSampleApp, nDataStreamListener { private: bool m_bOk; nBaseClientException *m_pAsyncException; static DataGroupDeltaDelivery *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to write to a * data group. Each time a message is written, only those properties which * have changed will be sent to the server. It is called after all command * line arguments have been received and validated. * * @param pRealmDetails a string array containing the possible RNAME values * @param nRealmDetail the size of the pRealmDetails array * @param count the number of iterations */ void doit(std::string *pRealmDetails, int nRealmDetail, int count) { m_pSelf->constructSession(pRealmDetails, nRealmDetail, this); try { //Create a data group nDataGroup *pAud_dg = m_pSession->createDataGroup("AUDGroup"); nDataGroup *pGbp_dg = m_pSession->createDataGroup("GBPGroup"); nDataGroup *pEur_dg = m_pSession->createDataGroup("EURGroup"); //add our stream to the groups so that we receive callbacks when an event is received pAud_dg->add(m_pStream); pGbp_dg->add(m_pStream); pEur_dg->add(m_pStream); printf("Creating and registering events\n"); nRegisteredEvent *pAud = pAud_dg->createRegisteredEvent(); nEventProperties *pProps = pAud->getProperties(); pProps->put("bid", 0.1111); pProps->put("offer", 0.1112); pProps->put("desc", std::string("Australian Dollar")); pProps->put("time", getTimeNow()); nRegisteredEvent *pGbp = pGbp_dg->createRegisteredEvent(); pProps = pGbp->getProperties(); pProps->put("bid", 0.2222); pProps->put("offer", 0.2223); pProps->put("desc", std::string("English Pound")); pProps->put("time", getTimeNow()); nRegisteredEvent *pEur = pEur_dg->createRegisteredEvent(); pProps = pEur->getProperties(); pProps->put("bid", 0.3333); pProps->put("offer", 0.3334); pProps->put("desc", std::string("Euro")); pProps->put("time", getTimeNow()); // The events are now registered with the server so commit them to the server pAud->commitChanges(true); pGbp->commitChanges(true); pEur->commitChanges(true); printf("All events are now registered and committed to the server\n"); Thread::sleep(5000); //Loop as many times as the number of messages we want to publish for(int i = 0; i < count; i ++) { try { // Both name and bid will be sent to the server as we readd them on each iteration. // The server will see that these properties have not changed so will not send the // events to the subscriptions pAud->getProperties()->put("bid", 0.5555); pAud->getProperties()->put("name", std::string("James")); pAud->commitChanges(); Thread::sleep(5000); // On the first iteration the consumer will receive all properties of the event because // each value has changed. On subsequent iterations, the bid and offer do not change // so the consumer will only receive the 'time' property pGbp->getProperties()->put("offer", 0.6666); pGbp->getProperties()->put("bid", 0.6667); pGbp->getProperties()->put("time", getTimeNow()); pGbp->commitChanges(); Thread::sleep(5000); //Because 'true' is passed to commitChanges, the current event registered on the server //will be overwritten with this event. Therefore the consumer will receive all properties //on each iteration. pEur->getProperties()->put("offer", 0.7777); pEur->getProperties()->put("bid", 0.7778); pEur->getProperties()->put("time", getTimeNow()); pEur->commitChanges(true);//Overwrites the current event on the server Thread::sleep(5000); } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { printf("Disconnected from Nirvana, Sleeping for 1 second...\n"); try { Thread::sleep(1000); } catch (Exception e) { } } } catch (nBaseClientException ex) { printf("Exception : %s\n", ex.message().c_str()); throw ex; } //Check if an asynchronous exception has been received if (!m_bOk) { //If it has, then throw it throw *m_pAsyncException; } } } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory::shutdown(); } public: void onMessage(nConsumeEvent *pEvt) { printf("Group : %s\n", pEvt->getDataGroupName().c_str()); printf("Event id : %d\n", pEvt->getEventID()); printf("Event tag : %s\n", pEvt->getEventTag().c_str()); if(pEvt->getAttributes()->isDelta()) printf("Is Delta : true\n"); else printf("Is Delta : false\n"); //Print the message data if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } printf("\n"); } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of rname if (argc < 2) { Usage(); exit(2); } std::string RNAME = argv[1]; int count = 10; if(argc>2) count = atoi(argv[2]); // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, count); delete[] pRproperties; } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new DataGroupDeltaDelivery(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { printf("Usage ...\n\n"); printf("DataGroupDeltaDelivery \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n\n"); printf("[Optional Arguments]\n\n"); printf("[count] - the number of times to commit the registered events (default : 10)"); } std::string getTimeNow() { struct timeb now; ftime (&now); struct tm *pTm = localtime (&now.time); char buffer[256]; sprintf (buffer, "%02d:%02d:%02d.%03d", pTm->tm_hour, pTm->tm_min, pTm->tm_sec, now.millitm); std::string strNow = buffer; return strNow; } DataGroupDeltaDelivery() : m_bOk(true), m_pAsyncException(NULL) { } }; DataGroupDeltaDelivery* DataGroupDeltaDelivery::m_pSelf = NULL; int main (int argc, char** argv) { return DataGroupDeltaDelivery::Main (argc, argv); }