/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nBaseClientException.h" #include "nChannelAttributes.h" #include "nSession.h" #include "nEventProperties.h" #include "nConsumeEvent.h" #include "nSessionNotConnectedException.h" #include "nRequestTimedOutException.h" #include "nUnexpectedResponseException.h" #include "nChannelNotFoundException.h" #include "nSecurityException.h" #include "nSessionPausedException.h" #include "nQueue.h" #include "nSessionFactory.h" #include "nSessionAttributes.h" #include using namespace com::pcbsys::nirvana::apps; using namespace com::pcbsys::nirvana::client; /** * Pushes events to a nirvana queue */ class pushq : nSampleApp { pushq () : m_bIsOk(true) { m_pAsyncException = new nBaseClientException(""); } private: bool m_bIsOk; nBaseClientException *m_pAsyncException; static pushq *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * a queue. * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail length of the rname array * @param aqueueName the queue name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string& aqueueName, int count, int size) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); try { nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(aqueueName); nQueue *pQueue = m_pSession->findQueue(pNca); //Create a byte array filled with characters equal to //the message size specified. unsigned char *pBuffer = new unsigned char[size]; for (int x = 0; x < size; x++) { pBuffer[x] = (unsigned char)((x % 90) + 32); } //Construct a sample nEventProperties object and add some sample properties //Instantiate the message to be published with the specified nEventPropeties and byte[] nEventProperties *pProps = new nEventProperties(); pProps->put("key0string", (std::string)"1"); pProps->put("key1int", (int)1); pProps->put("key2long", (longlong)-11); pProps->put("key3bool", true); pProps->put("key4short", (short)1); pProps->put("key5float", (float)1.123456); pProps->put("key6double", (double)1.123456); pProps->put("key7char", '1'); nConsumeEvent *pEvt1 = new nConsumeEvent(pProps, pBuffer, size); //Inform the user that publishing is about to start printf("Starting publish of %d events with a size of %d bytes each\n", count, size); //Get a timestamp to be used to calculate the message publishing rates struct timeb startTime; ftime (&startTime); unsigned long start = (unsigned long) startTime.time * 1000 + startTime.millitm; //Loop as many times as the number of messages we want to publish for (int x = 0; x < count; x++) { try { //Publish the event pQueue->push(pEvt1); } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { printf("Disconnected from Nirvana, Sleeping for 1 second...\n"); try { Poco::Thread::sleep(1000); } catch (Exception e) { } } x--; } catch (nBaseClientException ex) { printf("Publish : Exception : %s\n", ex.message().c_str()); throw ex; } if (!m_bIsOk) { throw m_pAsyncException; } } if (!m_bIsOk) { throw m_pAsyncException; } //Calculate the actual number of events published by obtaining //the server time //This also ensures that all client queues have been flushed. m_pSession->getServerTime(); struct timeb endTime; ftime (&endTime); unsigned long end = (unsigned long) endTime.time * 1000 + endTime.millitm; //Check if an asynchronous exception has been received if (!m_bIsOk) { //If it has, then throw it throw m_pAsyncException; } //Get a timestamp to calculate the publishing rates //Calculate the events / sec rate unsigned long dif = end - start; if(dif==0) { printf("%d events received in 0 seconds",count); } else { longlong eventPerSec = ((count * 1000) / dif); //Calculate the bytes / sec rate longlong bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates printf("Publish Completed in %lu ms :\n", dif); printf("[Events Published = %d] [Events/Second = %lld] [Bytes/Second = %lld]\n", count, eventPerSec, bytesPerSec); printf("Bandwidth data : Bytes Tx [%lld] Bytes Rx [%lld]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } if (pEvt1->delRef()) delete pEvt1; if (pProps->delRef()) delete pProps; delete[] pBuffer; if (pQueue->delRef()) delete pQueue; delete pNca; } catch (nChannelNotFoundException cnf) { printf(cnf.message().c_str()); exit(1); } catch (nSessionPausedException ps) { printf("Session has been paused, please resume the session\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } nSessionFactory::shutdown(); } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname, channel name if(argc <3) { Usage(); exit(2); } std::string RNAME = argv[1]; std::string queueName = argv[2]; int count = 1000; if (argc > 3) { count = atoi (argv[3]); } int size = 1024; if (argc > 4) { size = atoi(argv[4]); } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, queueName, count, size); } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new pushq(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { printf("Usage ...\n\n"); printf("pushq [count] [size] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf(" - Queue name parameter for the queue to publish to\n"); printf("\n[Optional Arguments] \n\n"); printf("[count] -The number of events to publish (default: 10)\n"); printf("[size] - The size (bytes) of the event to publish (default: 100)\n"); } }; pushq* pushq::m_pSelf = NULL; int main (int argc, char** argv) { return pushq::Main (argc, argv); }