/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ #include "nSampleApp.h" #include "nConstants.h" #include "nChannelNotFoundException.h" #include "nSecurityException.h" #include "nSessionNotConnectedException.h" #include "nUnexpectedResponseException.h" #include "nUnknownRemoteRealmException.h" #include "nRequestTimedOutException.h" #include "nEventListener.h" #include "nQueue.h" #include "nChannelAttributes.h" #include "nSession.h" #include "nSessionFactory.h" #include "nQueueSyncReader.h" #include "nQueueDetails.h" #include "nQueueReaderContext.h" #include "nConsumeEvent.h" #include "nQueueSyncTransactionReader.h" #include "Poco/Thread.h" #include #include using namespace com::pcbsys::nirvana::apps; using namespace com::pcbsys::nirvana::client; /** * Creates a synchronous queue reader and pops the queue */ class qreader : public nSampleApp, public nEventListener, public Poco::Runnable { public: qreader() : m_timeout(0), m_lastEID(0), m_startTime(0), m_byteCount(0), m_logLevel(0), m_count(-1), m_totalMsgs(0), m_reportCount(1000), m_bIsTx(false), m_pQueue(NULL), m_pReader(NULL), m_pQPopper(NULL) { } private: long m_timeout; static std::string m_selector; longlong m_lastEID; unsigned long m_startTime; long m_byteCount; int m_logLevel; int m_count; int m_totalMsgs; int m_reportCount; bool m_bIsTx; nQueue *m_pQueue; nQueueSyncReader *m_pReader; Poco::Thread *m_pQPopper; bool m_bCont; static qreader *m_pSelf; /** * This method demonstrates the Nirvana API calls necessary to create a * synchronous queue reader * * It is called after all command line arguments have been received and * validated * * @param pRealmDetails a String[] containing the possible RNAME values * @param nRealmDetail length of the rname array * @param queueName the queue name to pop * @param time the timeout for the queue pop * @param loglvl the specified log level * @param repCount the specified report count * @param transactional whether to use a transactional queue reader * @param sel the pop selector filter */ void doit(std::string *pRealmDetails, int nRealmDetail, std::string queueName, long time, int loglvl, int repCount, bool transactional, std::string sel) { m_logLevel = loglvl; m_reportCount = repCount; m_timeout = time; m_bIsTx = transactional; m_bCont = true; m_pSelf->constructSession(pRealmDetails, nRealmDetail); //Subscribes to the specified queue try { //Create a channel attributes object nChannelAttributes *pNca = new nChannelAttributes(); pNca->setName(queueName); //Obtain the queue reference m_pQueue = m_pSession->findQueue(pNca); //output queue details nQueueDetails *pDetails = m_pQueue->getDetails(); printf("Current queue size = %d\n", pDetails->getNoOfEvents()); printf("Current queue age = %lld\n", (pDetails->getLastEventTime() - pDetails->getFirstEventTime())); printf("Current storage size = %lld\n", pDetails->getTotalMemorySize()); printf("Current readers = %d\n", pDetails->getNoOfReaders()); delete pDetails; nQueueReaderContext *pContext = new nQueueReaderContext(this, sel); //create the queue reader if (m_bIsTx) { m_pReader = m_pQueue->createTransactionalReader(pContext); } else { m_pReader = m_pQueue->createReader(pContext); } if (pContext->delRef()) delete pContext; std::string threadName = "Q Reader thread"; m_pQPopper = new Poco::Thread(threadName); m_pQPopper->setPriority (Poco::Thread::PRIO_LOW); m_pQPopper->start(*this); //Stay subscribed until the user presses any key printf("Press any key to quit !\n"); std::cin.ignore(); printf("Finished. Consumed total of %d\n", m_totalMsgs); m_bCont = false; //Destroy the queue reader nQueue::destroyReader(m_pReader); if (m_pQueue->delRef()) delete m_pQueue; delete pNca; } //Handle errors catch (nChannelNotFoundException cnfe) { printf("The queue specified could not be found.\n"); printf("Please ensure that the queue exists in the REALM you connect to.\n"); exit(1); } catch (nSecurityException se) { printf("Unsufficient permissions for the requested operation.\n"); printf("Please check the ACL settings on the server.\n"); exit(1); } catch (nSessionNotConnectedException snce) { printf("The session object used is not physically connected to the Nirvana realm.\n"); printf("Please ensure the realm is up and check your RNAME value.\n"); exit(1); } catch (nUnexpectedResponseException ure) { printf("The Nirvana REALM has returned an unexpected response.\n"); printf("Please ensure the Nirvana REALM and client API used are compatible.\n"); exit(1); } catch (nUnknownRemoteRealmException urre) { printf("The queue specified resided in a remote realm which could not be found.\n"); printf("Please ensure the queue name specified is correct.\n"); exit(1); } catch (nRequestTimedOutException rtoe) { printf("The requested operation has timed out waiting for a response from the REALM.\n"); printf("If this is a very busy REALM ask your administrator to increase the client timeout values.\n"); exit(1); } catch (nBaseClientException nbce) { printf("An error occured while creating the Channel Attributes object.\n"); exit(1); } catch (Exception e) { printf(e.displayText().c_str()); } //Close the session we opened try { nSessionFactory::close(m_pSession); } catch (Exception ex) { } //Close any other sessions within this program so that we can exit nSessionFactory::shutdown(); } int getType () { return 0; } protected: virtual void processArgs(int argc, char** argv) { // // Need a min of 2, rname, channel name if (argc < 3) { Usage(); exit(2); } std::string RNAME = argv[1]; std::string queueName = argv[2]; int time = 10000; int loglvl = 1; int reportCount = 1000; bool transactional = false; if (argc > 3) { loglvl = atoi(argv[3]); } if (argc > 4) { time = atoi(argv[4]); } if (argc > 5) { transactional = !strcmp(argv[5], "true"); } //Check for a selector message filter value if (argc > 6) { m_selector = argv[6]; } if (argc > 7) { m_reportCount = atoi(argv[7]); } //Process the local REALM RNAME details int nRproperty = 0; std::string *pRproperties = parseRealmProperties(RNAME, nRproperty); m_pSelf->doit(pRproperties, nRproperty, queueName, time, loglvl, reportCount, transactional, m_selector); } public: static int Main(int argc, char** argv) { //Create an instance for this class m_pSelf = new qreader(); m_pSelf->processArgs(argc, argv); return 0; } /** * This method is invoked from the queue receive thread each time an event is received from * the nirvana queue. * * @param pEvt An nConsumeEvent object containing the message received from the queue */ public: void go(nConsumeEvent *pEvt) { if (m_bIsTx) { ((nQueueSyncTransactionReader*)m_pReader)->commit(pEvt->getEventID()); } //If this is the first message we receive if (m_count == -1) { //Get a timestamp to be used for message rate calculations struct timeb tm; ftime (&tm); m_startTime = (unsigned long) tm.time * 1000 + tm.millitm; m_count = 0; } //Increment the counter m_count++; m_totalMsgs++; //Have we reached the point where we need to report the rates? if (m_count == m_reportCount) { //Reset the counter m_count = 0; //Get a timestampt to calculate the rates struct timeb tm; ftime (&tm); unsigned long end = (unsigned long) tm.time * 1000 + tm.millitm; // Does the specified log level permits us to print on the screen? if (m_logLevel >= 1) { //Dump the rates on the screen if (end != m_startTime) { unsigned long ts = end - m_startTime; printf("Received %d in %lu Evt/Sec = %lu Bytes/sec=%lu\n", m_reportCount, ts, (m_reportCount * 1000) / ts, (m_byteCount * 1000) / ts); printf("Bandwidth data : Bytes Tx [%llu] Bytes Rx [%llu]\n", m_pSession->getOutputByteCount(), m_pSession->getInputByteCount()); } else { printf("Received %d faster than the system millisecond counter\n", m_reportCount); } } //Set the startTime for the next report equal to the end timestamp for the previous one m_startTime = end; //Reset the byte counter m_byteCount = 0; } //If the last EID counter is not equal to the current event ID if (m_lastEID != pEvt->getEventID()) { //If yes, maybe we have missed an event, so print a message on the screen. //This message could be printed for a number of other reasons. //One of them would be someone purging a range creating an 'eid gap'. //As eids are never reused within a channel you could have a situation //where this gets printed but nothing is missed. printf ("Expired event range %lld - %lld\n", m_lastEID, (pEvt->getEventID() - 1)); //Reset the last eid counter m_lastEID = pEvt->getEventID() + 1; } else { //Increment the last eid counter m_lastEID++; } //Get the data of the message unsigned char *pBuffer = pEvt->getEventData(); int length = pEvt->getEventDataLength(); if (pBuffer != NULL) { //Add its length to the byte counter m_byteCount += length; } //If the loglevel permits printing on the screen if (m_logLevel >= 2) { //Print the eid printf("Event id : %lld\n", pEvt->getEventID()); if (pEvt->isEndOfChannel()) { printf("End of channel reached\n"); } //If the loglevel permits printing on the screen if (m_logLevel >= 3) { //Print the message tag printf("Event tag : %s\n", pEvt->getEventTag().c_str()); //Print the message data unsigned char *pBuffer = pEvt->getEventData(); int length = pEvt->getEventDataLength(); std::string eventData; nConstants::decode(pBuffer, length, eventData); printf("Event data : %s\n", eventData.c_str()); if (pEvt->hasAttributes()) { displayEventAttributes(pEvt->getAttributes()); } nEventProperties *pProp = pEvt->getProperties(); if (pProp != NULL) { displayEventProperties(pProp); } } } } public: void run() { while (m_bCont) { try { nConsumeEvent *pEvt = m_pReader->pop(m_timeout); if (pEvt != NULL) { go(pEvt); if(pEvt->delRef()) delete pEvt; } } catch(nSessionNotConnectedException ex) { break; } catch (Exception e) { printf("Unexpected exception in pop....exiting!\n"); printf(e.displayText().c_str()); exit(1); } } } /** * Prints the usage message for this class */ private: static void Usage() { printf("qreader [debug] [timeout] [transactional] [selecter] [count] \n\n"); printf(" \n\n"); printf(" - the rname of the server to connect to\n"); printf(" - Queue name to pop from\n"); printf("\n[Optional Arguments] \n\n"); printf("[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All\n"); printf("[timeout] - The timeout for the synchronous pop call\n"); printf("[transactional] - true / false whether the subscriber is transactional, if true, each event consumed will be ack'd to confirm receipt\n"); printf("[selector] - The event filter string to use\n"); printf("[count] - The number of events to wait before printing out summary information\n"); } }; qreader* qreader::m_pSelf = NULL; std::string qreader::m_selector; int main (int argc, char** argv) { return qreader::Main (argc, argv); }