/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nConstants.h" #include "nSessionNotConnectedException.h" #include "nChannelNotFoundException.h" #include "nSecurityException.h" #include "nUnexpectedResponseException.h" #include "nUnknownRemoteRealmException.h" #include "nRequestTimedOutException.h" #include "nTransactionAlreadyCommittedException.h" #include "nTransactionNotStartedException.h" #include "nChannelAttributes.h" #include "nSession.h" #include "nQueue.h" #include "nEventProperties.h" #include "nConsumeEvent.h" #include "nTransactionAttributes.h" #include "nTransaction.h" #include "nTransactionFactory.h" #include "nSessionFactory.h" #include "Poco/Thread.h" #include #include using namespace com::pcbsys::nirvana::apps; using namespace com::pcbsys::nirvana::client; using namespace com::pcbsys::nirvana::nbase; class txpushq : public nSampleApp { /** * Publishes transactionally to a queue */ private: static txpushq *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to publish to * a queue transactionally. * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail length of the rname array * @param queueName the queue name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published * @param txSize the number of messages to be published in each transaction */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string queueName, int count, int size, int txSize) { m_pSelf->constructSession(pRealmDetails, nRealmDetail); //Publishes to the specified queue try { //Create a channel attributes object nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(queueName); //Obtain a reference to the queue nQueue *pQueue = m_pSession->findQueue(pNca); //Create a byte array filled with characters equal to //the message size specified. unsigned char *buffer = new unsigned char[size]; for (int x = 0; x < size; x++) { buffer[x] = (unsigned char)((x % 90) + 32); } //Create a List that contains the messages to be published on each //transactional publish call std::list vevents; for (int x = 0; x < txSize; x++) { std::stringstream ss; ss<< "TXID-" << x; unsigned int length = 0; unsigned char *txid = fBase::encode(ss.str(), length); nEventProperties *pProps = new nEventProperties(); pProps->put("floatval", (float)1.2345); nConsumeEvent *pEvt = new nConsumeEvent(pProps, txid, length); // Note the Tag does NOT need the TX vevents.push_back(pEvt); delete[] txid; if (pProps->delRef()) delete pProps; } //Inform the user that publishing is about to start printf ("Starting publish of %d events of size %d\n", count, size); //Get a timestamp to be used to calculate the message publishing rates struct timeb tm; ftime (&tm); unsigned long start = (unsigned long) tm.time * 1000 + tm.millitm; //Create a transaction attributes object based on the queue we wish to publish to nTransactionAttributes *pTXAttrib = new nTransactionAttributes(pQueue, 1000); //Define a transaction object nTransaction *pTx = NULL; //Loop as many times as the number of messages we want to publish //divided by the number of events per transaction for (int x = 0; x < ((int)(count / txSize)); x++) { try { //Create a new transaction object pTx = nTransactionFactory::create(pTXAttrib); //Publish the List of messages create previously pTx->publish(vevents); //Commit the transaction pTx->commit(); delete pTx; } catch (nSessionNotConnectedException ex) { while (!m_pSession->isConnected()) { printf("Disconnected from Nirvana, Sleeping for 1 second...\n"); try { Poco::Thread::sleep(1000); } catch (Exception e) { } } if (pTx != NULL) { try { pTx->commit(); printf ("Commited tx after failover...\n"); delete pTx; } catch (Exception e) { printf("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event..\n"); x--; } } else { printf("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event..\n"); x--; } } } //Calculate the actual number of events //This also ensures that all client queues have been flushed. m_pSession->getServerTime(); //Get a timestamp to calculate the publishing rates ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; unsigned long dif = end - start; if(dif==0) { printf("%d events received in 0 seconds",count); } else { //Calculate the events / sec rate long eventPerSec = ((count * 1000) / dif); //Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates printf("Events = %d Events/sec = %ld Bytes/Sec = %ld\n", count, eventPerSec, bytesPerSec); } delete pTXAttrib; for (std::list::iterator iterator = vevents.begin(); iterator != vevents.end(); iterator++) { if ((*iterator)->delRef()) delete (*iterator); } delete[] buffer; if (pQueue->delRef()) delete pQueue; delete pNca; } //Handle errors catch (nChannelNotFoundException cnfe) { printf("The queue specified could not be found.\n"); printf("Please ensure that the queue exists in the REALM you connect to.\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nUnknownRemoteRealmException urre) { printf("The queue specified resided in a remote realm which could not be found.\n"); printf("Please ensure the queue name specified is correct.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } catch (nTransactionAlreadyCommittedException tace) { printf("The transaction you attempted to commit has already been commited on this REALM.\n"); exit(1); } catch (nTransactionNotStartedException tnse) { printf("The transaction you attempted to commit has not yet started on this REALM.\n"); exit(1); } catch (nBaseClientException nbce) { printf("An error occured while creating the Channel Attributes object.\n"); exit(1); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions within this program so that we can exit nSessionFactory::shutdown(); } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname, channel name if (argc < 3) { Usage(); exit(2); } std::string RNAME = argv[1]; std::string queueName = argv[2]; int count = 1000; if (argc > 3) { count = atoi(argv[3]); } int size = 1024; if (argc > 4) { size = atoi(argv[4]); } int txSize = 1; if (argc > 5) { txSize = atoi(argv[5]); } // // Run the sample app // int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, queueName, count, size, txSize); return; } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new txpushq(); //Process command line arguments m_pSelf->processArgs(argc, argv); return 0; } /** * Prints the usage message for this class */ private: static void Usage() { printf("Usage ...\n\n"); printf("txpushq [count] [size] [tx size] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf(" - Queue name parameter for the queue to publish to\n"); printf("\n[Optional Arguments] \n\n"); printf("[count] -The number of events to publish (default: 10)\n"); printf("[size] - The size (bytes) of the event to publish (default: 100)\n"); printf("[tx size] - The number of events per transaction (default: 1)\n"); } }; txpushq* txpushq::m_pSelf = NULL; int main (int argc, char** argv) { return txpushq::Main (argc, argv); }