/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nDataGroupListener.h" #include "nDataStreamListener.h" #include "nSession.h" #include "nDataGroup.h" #include "nEventProperties.h" #include "nConsumeEvent.h" #include "nSessionNotConnectedException.h" #include "nBaseClientException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionFactory.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include #include #include "Poco/Thread.h" #include using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; class DataGroupPublisher : public nSampleApp, public nDataGroupListener, public nDataStreamListener { private: bool m_bOk; nBaseClientException *m_pAsyncException; nDataGroup *m_pGroup; bool m_bHasStreams; static DataGroupPublisher *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * an nDataGroup. * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param groupName the data group name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published */ private: void doit(std::string *pRealmDetails, int nRealmDetail, const std::string& groupName, int count, int size) { m_pSelf->constructSession(pRealmDetails, nRealmDetail, this); try { std::list *pGrps = m_pSession->getDataGroups(this); for (std::list::iterator iterator = pGrps->begin(); iterator != pGrps->end(); iterator++) { nDataGroup *pGrp = *iterator; if (pGrp->getName().compare(groupName) == 0) { m_pGroup = pGrp; m_pGroup->addListener(this); break; } if (pGrp->delRef()) delete pGrp; } delete pGrps; // if group not found, create it if (m_pGroup == NULL) m_pGroup = m_pSession->createDataGroup(groupName, this); // get the default group to determine how many stream users are connected nDataGroup *pDefaultGroup = m_pSession->getDefaultDataGroup(this); // TODO: refs // just add each connected stream to the group std::list *pStreams = pDefaultGroup->getStreams(); m_bHasStreams = (m_pGroup->size() > 0); for (std::list::iterator streamIterator = pStreams->begin(); streamIterator != pStreams->end(); streamIterator++) { nDataStream *pStream = *streamIterator; if (pStream->getName().compare(m_pSession->getStreamId()) != 0) { m_pGroup->add(pStream); } } //Create a byte array filled with characters equal to // the message size specified. This could be a result //of String.getBytes() call in a real world scenario. unsigned char *pBuffer = new unsigned char[size]; for (int x = 0; x < size; x++) { pBuffer[x] = (unsigned char)((x % 90) + 32); } //Instantiate the message to be published with the specified nEventPropeties / byte[] nEventProperties *pProps = new nEventProperties(); //You can add other types in a dictionary object pProps->put("key0string", "1"); pProps->put("key1int", (int)1); pProps->put("key2long", (longlong)-11); nConsumeEvent *pEvt1 = new nConsumeEvent(pProps, pBuffer, size); delete[] pBuffer; //Inform the user that publishing is about to start std::cout << "Starting publish of "<writeDataGroup(pEvt1, m_pGroup); } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { std::cout << "Disconnected from Nirvana, Sleeping for 1 second..." <delRef()) delete pProps; if (pEvt1->delRef()) delete pEvt1; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; //Get a timestamp to calculate the publishing rates //Calculate the events / sec rate unsigned long dif = end - start; long eventPerSec = ((count * 1000) / dif); //Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates std::cout <<"Publish Completed in "<getOutputByteCount() <<"] Bytes Rx ["<< m_pSession->getInputByteCount() <<"]" <delRef()) delete m_pGroup; } void onMessage(nConsumeEvent *pEvt) { } // new stream added to default data group void addedStream(nDataGroup *pGroup, nDataStream *pStream, int count) { std::cout << pGroup->getName() <<" : New stream "<< pStream->getName() <<" : "<< pStream->getSubject() <<" added, size now "<< count <getName().compare(m_pGroup->getName()) == 0) && (count > 0)) { m_bHasStreams = true; std::cout << pGroup->getName() <<" : "<< count <<" Streams are registered, starting publisher thread" <getName().compare(m_pSession->getStreamId()) != 0) { m_pGroup->add(pStream); } } } // stream removed from data group void deletedStream(nDataGroup *pGroup, nDataStream *pStream, int count, bool serverRemoved) { std::cout << pGroup->getName() <<" : Data stream "<< pStream->getName() <<" : "<< pStream->getSubject() <<" removed, size now "<< count <getName().compare(m_pGroup->getName()) == 0) && (count == 0)) { m_bHasStreams = false; std::cout << "No streams are registered, stopping publisher thread" <getName().compare(m_pGroup->getName()) == 0) && (count > 0)) { m_bHasStreams = true; std::cout << pGroup->getName() <<" : "<< count <<" Streams are registered, starting publisher thread" <getName() << " : Data group " << pGroup->getName() << " added, size now "<getName() << " : Data group " << pGroup->getName() << " added, size now "<getName() << std::endl; } void createdGroup(nDataGroup *pGroup) { std::cout << "Data group created : " << pGroup->getName() < 3) { count = atoi (argv[3]); } int size = 1024; if (argc > 4) { size = atoi (argv[4]); } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, groupName, count, size); delete[] pRproperties; } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new DataGroupPublisher(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { std::cout <<"Usage ...\n\n" <<"DataGroupPublish [count] [size] \n\n" <<" \n\n" <<" - the rname of the server to connect to\n" <<" - Data group name parameter to publish to\n" <<"\n[Optional Arguments] \n\n" <<"[count] -The number of events to publish (default: 10)\n" <<"[size] - The size (bytes) of the event to publish (default: 100)" <