/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nConstants.h" #include "nBaseClientException.h" #include "nChannelAttributes.h" #include "nChannel.h" #include "nSession.h" #include "nEventProperties.h" #include "nConsumeEvent.h" #include "nSessionNotConnectedException.h" #include "nSessionFactory.h" #include "nRequestTimedOutException.h" #include "nUnexpectedResponseException.h" #include "nChannelNotFoundException.h" #include "nSecurityException.h" #include "nSessionPausedException.h" #include "nSessionAttributes.h" #include "Poco/Thread.h" #include using namespace com::pcbsys::nirvana::apps; using namespace com::pcbsys::nirvana::client; class publish : nSampleApp { /** * Publishes messages to a channel */ publish () : m_bIsOk(true) { m_pAsyncException = new nBaseClientException(""); } private: bool m_bIsOk; nBaseClientException *m_pAsyncException; static publish *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * a channel. * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail length of the rname array * @param channelName the channel name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string& achannelName, int count, int size) { nConstants::setWriteHandlerType(nConstants::m_sDirectWriteHandler); m_pSelf->constructSession(pRealmDetails, nRealmDetail); try { nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(achannelName); nChannel *pChannel = m_pSession->findChannel(pNca); //Create a byte array filled with characters equal to //the message size specified. unsigned char *pBuffer = new unsigned char[size]; for (int x = 0; x < size; x++) { pBuffer[x] = (unsigned char)((x % 90) + 32); } //Construct a sample nEventProperties object and add some sample properties //Instantiate the message to be published with the specified nEventPropeties and byte[] nEventProperties *pProps = new nEventProperties(); pProps->put("key0string", (std::string)"1"); pProps->put("key1int", (int)1); pProps->put("key2long", (longlong)-11); pProps->put("key3bool", true); pProps->put("key4short", (short)1); pProps->put("key5float", (float)1.123456); pProps->put("key6double", (double)1.123456); pProps->put("key7char", (unsigned char) '1'); nConsumeEvent *pEvt1 = new nConsumeEvent(pProps, pBuffer, size); printf("Starting publish of %d events with a size of %d bytes each\n", count, size); longlong startEid = pChannel->getLastEID(); struct timeb startTime; ftime (&startTime); unsigned long start = (unsigned long) startTime.time * 1000 + startTime.millitm; for (int x = 0; x < count; x++) { try { // publish the event pChannel->publish(pEvt1); } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { printf("Disconnected from Nirvana, Sleeping for 1 second...\n"); try { Poco::Thread::sleep(1000); } catch (Exception e) { } } x--; } catch (nBaseClientException ex) { printf("Publish : Exception : %s\n", ex.message().c_str()); throw ex; } if (!m_bIsOk) { throw m_pAsyncException; } } if (!m_bIsOk) { throw m_pAsyncException; } printf("Get Last EID\n"); longlong events = pChannel->getLastEID() - startEid; //Calculate the actual number of events published by obtaining //the server time //This also ensures that all client queues have been flushed. struct timeb endTime; ftime (&endTime); unsigned long end = (unsigned long) endTime.time * 1000 + endTime.millitm; unsigned long dif = end - start; if (dif == 0) printf ("Time difference less than one millisecond\n"); else { longlong eventPerSec = ((events * 1000) / dif); longlong bytesPerSec = eventPerSec * size; printf("Publish Completed in %lu ms :\n", dif); printf("[Events Published = %lld] [Events/Second = %lld] [Bytes/Second = %lld]\n", events, eventPerSec, bytesPerSec); printf("Bandwidth data : Bytes Tx [%lld] Bytes Rx [%lld]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } if (pEvt1->delRef()) delete pEvt1; if (pProps->delRef()) delete pProps; delete[] pBuffer; delete pChannel; delete pNca; } catch (nChannelNotFoundException cnf) { printf(cnf.message().c_str()); exit(1); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } try { //Close the session we opened nSessionFactory::close(m_pSession); } catch (nRequestTimedOutException ex) { } catch (Exception ex) { } nSessionFactory::shutdown(); } protected: virtual void processArgs(int argc, char** argv) { if (argc < 3) { Usage(); exit(2); } } public: static int Main(int argc, char** argv) { m_pSelf = new publish(); m_pSelf->processArgs(argc, argv); std::string channelName = argv[2]; int count = 10; int size = 100; if(argc>3) { count = atoi (argv[3]); } if(argc>4) { size = atoi (argv[4]); } std::string RNAME = argv[1]; int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, channelName, count, size); return 0; } private: /** * Prints the usage message for this class */ static void Usage() { printf("Usage ...\n\n"); printf("publish [count] [size] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf(" - Channel name parameter for the channel to publish to\n"); printf("\n[Optional Arguments] \n\n"); printf("[count] -The number of events to publish (default: 10)\n"); printf("[size] - The size (bytes) of the event to publish (default: 100)\n"); } }; publish* publish::m_pSelf = NULL; int main (int argc, char** argv) { return publish::Main (argc, argv); }