/* \n Copyright 1999-2011 (c) My-Channels \n Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. \n\n Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG.\n */\n #include "nSampleApp.h" #include "nDataGroupListener.h" #include "nDataStreamListener.h" #include "nSession.h" #include "nDataGroup.h" #include "nEventProperties.h" #include "nConsumeEvent.h" #include "nSessionNotConnectedException.h" #include "nBaseClientException.h" #include "nSessionPausedException.h" #include "nSecurityException.h" #include "nSessionFactory.h" #include "nUnexpectedResponseException.h" #include "nRequestTimedOutException.h" #include "Poco/SynchronizedObject.h" #include #ifdef WIN32 #include #endif using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::apps; using namespace Poco; class DataGroupPublisher : public nSampleApp, public nDataGroupListener, public nDataStreamListener { private: bool m_bOk; nBaseClientException *m_pAsyncException; nDataGroup *m_pGroup; bool m_bHasStreams; SynchronizedObject m_mutex; static DataGroupPublisher *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * an nDataGroup. * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param groupName the data group name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published */ private: void doit(std::string *pRealmDetails, int nRealmDetail, const std::string& groupName, int count, int size) { m_pSelf->constructSession(pRealmDetails, nRealmDetail, this); try { std::list *pGrps = m_pSession->getDataGroups(this); for (std::list::iterator iterator = pGrps->begin(); iterator != pGrps->end(); iterator++) { nDataGroup *pGrp = *iterator; if (pGrp->getName().compare(groupName) == 0) { m_pGroup = pGrp; m_pGroup->addListener(this); break; } if (pGrp->delRef()) delete pGrp; } delete pGrps; // if group not found, create it if (m_pGroup == NULL) m_pGroup = m_pSession->createDataGroup(groupName, this); // get the default group to determine how many stream users are connected nDataGroup *pDefaultGroup = m_pSession->getDefaultDataGroup(this); // TODO: refs // just add each connected stream to the group std::list *pStreams = pDefaultGroup->getStreams(); m_bHasStreams = (m_pGroup->size() > 0); for (std::list::iterator streamIterator = pStreams->begin(); streamIterator != pStreams->end(); streamIterator++) { nDataStream *pStream = *streamIterator; if (pStream->getName().compare(m_pSession->getStreamId()) != 0) { m_pGroup->add(pStream); } } //Create a byte array filled with characters equal to // the message size specified. This could be a result //of String.getBytes() call in a real world scenario. unsigned char *pBuffer = new unsigned char[size]; for (int x = 0; x < size; x++) { pBuffer[x] = (unsigned char)((x % 90) + 32); } //Instantiate the message to be published with the specified nEventPropeties / byte[] nEventProperties *pProps = new nEventProperties(); //You can add other types in a dictionary object pProps->put("key0string", "1"); pProps->put("key1int", (int)1); pProps->put("key2long", (longlong)-11); nConsumeEvent *pEvt1 = new nConsumeEvent(pProps, pBuffer, size); delete[] pBuffer; //Inform the user that publishing is about to start printf("Starting publish of %d events with a size of %d bytes each\n", count, size); //Get a timestamp to be used to calculate the message publishing rates struct timeb tm; ftime (&tm); unsigned long start = (unsigned long) tm.time * 1000 + tm.millitm; //Loop as many times as the number of messages we want to publish for (int x = 0; x < count; x++) { while (!m_bHasStreams) { m_mutex.lock(); printf("No streams are registered, waiting for notification\n"); m_mutex.unlock(); m_mutex.wait(); m_mutex.lock(); if (m_bHasStreams) { printf("Streams are registered starting to publish\n"); } m_mutex.unlock(); } try { //Publish the event m_pSession->writeDataGroup(pEvt1, m_pGroup); } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { printf("Disconnected from Nirvana, Sleeping for 1 second...\n"); try { Thread::sleep(1000); } catch (Exception e) { } } x--; //We need to repeat the publish for the event publish that caused the exception, //so we reduce the counter } catch (nBaseClientException ex) { printf("Exception : %s\n", ex.message().c_str()); throw ex; } //Check if an asynchronous exception has been received if (!m_bOk) { //If it has, then throw it throw *m_pAsyncException; } } //Check if an asynchronous exception has been received if (!m_bOk) { //If it has, then throw it throw *m_pAsyncException; } if (pProps->delRef()) delete pProps; if (pEvt1->delRef()) delete pEvt1; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; //Get a timestamp to calculate the publishing rates //Calculate the events / sec rate unsigned long dif = end - start; long eventPerSec = ((count * 1000) / dif); //Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates printf("Publish Completed in %d ms :\n", dif); printf("[Events Published = %d] [Events/Second = %d] [Bytes/Second = %d]\n", count, eventPerSec, bytesPerSec); printf("Bandwidth data : Bytes Tx [%d] Bytes Rx [%d]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory::shutdown(); } // callback for DataStreamListener public: ~DataGroupPublisher() { if (m_pGroup && m_pGroup->delRef()) delete m_pGroup; } void onMessage(nConsumeEvent *pEvt) { } // new stream added to default data group void addedStream(nDataGroup *pGroup, nDataStream *pStream, int count) { printf("%s : New stream %s : %s added, size now %d\n", pGroup->getName().c_str(), pStream->getName().c_str(), pStream->getSubject().c_str(), count); if ((pGroup->getName().compare(m_pGroup->getName()) == 0) && (count > 0)) { m_bHasStreams = true; printf("%s : %d Streams are registered, starting publisher thread\n", pGroup->getName().c_str(), count); m_mutex.lock(); m_mutex.notify(); m_mutex.unlock(); } else { if (pStream->getName().compare(m_pSession->getStreamId()) != 0) { m_pGroup->add(pStream); } } } // stream removed from data group void deletedStream(nDataGroup *pGroup, nDataStream *pStream, int count, bool serverRemoved) { printf("%s : Data stream %s : %s removed, size now %d\n", pGroup->getName().c_str(), pStream->getName().c_str(), pStream->getSubject().c_str(), count); if ((pGroup->getName().compare(m_pGroup->getName()) == 0) && (count == 0)) { m_bHasStreams = false; printf("No streams are registered, stopping publisher thread\n"); } else if ((pGroup->getName().compare(m_pGroup->getName()) == 0) && (count > 0)) { m_bHasStreams = true; printf("%s : %d Streams are registered, starting publisher thread\n", pGroup->getName().c_str(), count); } } // new data group added void addedGroup(nDataGroup *pTo, nDataGroup *pGroup, int count) { printf("%s : Data group %s added, size now %d\n", pTo->getName().c_str(), pGroup->getName().c_str(), count); } // data group removed void removedGroup(nDataGroup *pFrom, nDataGroup *pGroup, int count) { printf("%s : Data group %s removed, size now %d\n", pFrom->getName().c_str(), pGroup->getName().c_str(), count); } void deletedGroup(nDataGroup *pGroup) { printf("Data group deleted : %s\n", pGroup->getName().c_str()); } void createdGroup(nDataGroup *pGroup) { printf("Data group created : %s\n", pGroup->getName().c_str()); } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 3, rname, group name if (argc < 3) { Usage(); exit(2); } std::string RNAME = argv[1]; std::string groupName = argv[2]; int count = 1000; if (argc > 3) { count = atoi (argv[3]); } int size = 1024; if (argc > 4) { size = atoi (argv[4]); } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, groupName, count, size); delete[] pRproperties; } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new DataGroupPublisher(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { printf("Usage ...\n\n"); printf("DataGroupPublish [count] [size] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf(" - Data group name parameter to publish to\n"); printf("\n[Optional Arguments] \n\n"); printf("[count] -The number of events to publish (default: 10)\n"); printf("[size] - The size (bytes) of the event to publish (default: 100)\n"); } DataGroupPublisher() : m_bOk(true), m_pAsyncException(NULL), m_pGroup(NULL), m_bHasStreams(false) { } }; DataGroupPublisher* DataGroupPublisher::m_pSelf = NULL; int main (int argc, char** argv) { return DataGroupPublisher::Main (argc, argv); }