/* Copyright 1999-2011 (c) My-Channels Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using com.pcbsys.nirvana.client; using System.Threading; using System.Collections; namespace com.pcbsys.nirvana.apps { class requester : nSampleApp, nEventListener { private static requester myself; private bool async=false; private nQueue reqQueue; private nQueue respQueue; private bool transactional = false; private nQueueReader queueReader; private String reqQueueName; private String respQueueName; private String rname; private byte[] username; private static readonly UTF8Encoding encoding = new UTF8Encoding(); /** * @param args */ public static void Main(String[] args) { myself = new requester(); //Process input variables. myself.processArgs(args); requester.processEnvironmentVariables(); //Construct realm properties. String[] rproperties = new String[4]; Console.WriteLine("username: " + encoding.GetString(myself.username)); rproperties = parseRealmProperties(myself.rname); myself.doit(rproperties, myself.reqQueueName, myself.respQueueName); } void processArgs(String[] args) { if (args.Length > 6) { Usage(); UsageEnv(); } else { switch (args.Length) { case 6: myself.transactional=Convert.ToBoolean(args[5]); goto case 5; case 5: myself.async = Convert.ToBoolean(args[4]); goto case 4; case 4: myself.username = encoding.GetBytes(args[3]); goto case 3; case 3: myself.respQueueName = args[2]; goto case 2; case 2: myself.reqQueueName = args[1]; goto case 1; case 1: if (args[0].Equals("-?")) { Usage(); UsageEnv(); } rname = args[0]; break; } } } private void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("nsubchan \n"); Console.WriteLine(" \n"); Console.WriteLine(" - Queue onto which request are published"); Console.WriteLine(" - Queue onto which responses are published"); Console.WriteLine(" - Channel name parameter for the channel to subscribe to"); Console.WriteLine(" - the tag to identify this requester by."); Console.WriteLine("\n[Optional Arguments] \n"); Console.WriteLine("[asynchronous] - Whether to use asynchronous producing and consuming - true/false, default false."); Console.WriteLine("[transactional] - Whether to use transactional production and consumption of events - true/false, default false."); Console.WriteLine("\n\nNote: -? provides help on environment variables \n"); } public void doit(String[] rproperties, String requestQueueName, String responseQueueName) { try { myself.constructSession(rproperties); //Connect to response queue. nChannelAttributes respAtr = new nChannelAttributes(); respAtr.setName(responseQueueName); respQueue = mySession.findQueue(respAtr); //Connect to request queue. nChannelAttributes reqAtr = new nChannelAttributes(); reqAtr.setName(requestQueueName); reqQueue = mySession.findQueue(reqAtr); Thread.Sleep(1000); //Create initial request. nEventProperties req = new nEventProperties(); nConsumeEvent resp = new nConsumeEvent("tag", encoding.GetBytes("Request")); Console.WriteLine(resp.getEventTag()); Console.WriteLine(encoding.GetString(resp.getEventData())); setQueueReader(respQueue); if (async) { Console.WriteLine("Beginning to listen asynchronously..."); } else { //Set up a thread to process incoming synchronous events. Console.WriteLine("Beginning to listen synchronously..."); ThreadStart ts = new ThreadStart(run); Thread reader; reader = new Thread(ts); reader.IsBackground = true; reader.Start(); } //Construct transactional set of events if necessary(only one message in this example however). if (transactional) { Console.WriteLine("Transactional"); List vEvents = new List(); nTransactionAttributes TXAttrib = new nTransactionAttributes( reqQueue, 1000); new nTransactionFactory(); nTransaction tx = nTransactionFactory.create(TXAttrib); vEvents.Add(resp); tx.publish(vEvents); tx.commit(); } else { //Otherwise simply publish the event. reqQueue.push(resp); } Console.WriteLine("Published request \"Request\""); } catch (Exception ex) { Console.WriteLine(ex.StackTrace); } //Exit on user input. Console.Read(); //Destroy the reader. try { nQueue.destroyReader(queueReader); } catch (Exception ex) { Console.WriteLine("Already disconected."); Console.WriteLine(ex.StackTrace); } Console.WriteLine("Finished"); Environment.Exit(0); } // Set the appropriate type of queue listener. private void setQueueReader(nQueue respQueue) { try { if (async) { if (transactional) { queueReader = respQueue .createAsyncTransactionalReader(new nQueueReaderContext( this)); } else { queueReader = respQueue .createAsyncReader(new nQueueReaderContext(this)); } } else { if (transactional) { queueReader = respQueue .createTransactionalReader(new nQueueReaderContext( this)); } else { queueReader = respQueue .createReader(new nQueueReaderContext(this)); } } } catch (Exception ex) { Console.WriteLine("Could not create reader"); Environment.Exit(1); } } //Construct a session with the user name supplied. void constructSession(String[] realmDetails) { // Create a realm session attributes object from the array of strings try { nsa = new nSessionAttributes(realmDetails, 2); } catch (Exception ex) { Console.WriteLine("Error creating Session Attributes. Please check your RNAME"); Environment.Exit(1); } // Add this class as an asynchronous exception listener try { // Create a session object from the session attributes object, // passing this // as a reconnect handler class (optional). This will ensure that // the reconnection // methods will get called by the API. String usernameString = encoding.GetString(username); mySession = nSessionFactory.create(nsa, this, usernameString); mySession.addAsyncExceptionListener(this); } catch (nIllegalArgumentException ex) { } // Initialise the Nirvana session. This physically opens the connection // to the // Nirvana realm, using the specified protocols. If multiple interfaces // are supported // these will be attempted in weight order (SSL, HTTPS, socket, HTTP). try { mySession.init(); mySession.updateConnectionListWithServerList(); } // Handle errors catch (nSecurityException sec) { Console.WriteLine("The current user is not authorised to connect to the specified Realm Server"); Console.WriteLine("Please check the realm acls or contact support"); Console.WriteLine(sec.StackTrace); Environment.Exit(1); } catch (nRealmUnreachableException rue) { Console.WriteLine("The Nirvana Realm specified by the RNAME value is not reachable."); Console.WriteLine("Please ensure the Realm is running and check your RNAME value."); Console.WriteLine(rue.StackTrace); Environment.Exit(1); } catch (nSessionNotConnectedException snce) { Console.WriteLine("The session object used is not physically connected to the Nirvana Realm."); Console.WriteLine("Please ensure the Realm is up and check your RNAME value."); Console.WriteLine(snce.StackTrace); Environment.Exit(1); } catch (nSessionAlreadyInitialisedException ex) { Console.WriteLine("The session object has already been initialised."); Console.WriteLine("Please make only one call to the .init() function."); Console.WriteLine(ex.StackTrace); Environment.Exit(1); } } public void go(nConsumeEvent evt){ //Deal with the response. Console.WriteLine("Recieved Response :\"" + (encoding.GetString(evt.getEventData())) + "\""); } void run() { //Deal with synchronous events i.e. the server response. Console.WriteLine("Running Thread"); nConsumeEvent evt; while (true) { try { if(transactional){evt = ((nQueueSyncTransactionReader)queueReader).pop(-1);} else{evt = ((nQueueSyncReader)queueReader).pop(-1); } if (evt != null) { go(evt); } } catch (Exception e) { Console.WriteLine("Exception in pop....exiting!"); Console.WriteLine(e.StackTrace); break; } } Environment.Exit(1); } } }