/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Collections.Generic; using System.Collections; using System.Linq; using System.Text; using com.pcbsys.nirvana.client; using System.Threading; namespace com.pcbsys.nirvana.apps { class responder : nSampleApp, nEventListener { private static responder myself; private nQueue respQueue; private nQueue reqQueue; private bool async = false; private bool transactional = false; private nQueueReader queueReader = null; String reqQueueName; String respQueueName; String rname; private static readonly UTF8Encoding encoding = new UTF8Encoding(); /** * @param args */ public static void Main(String[] args) { myself = new responder(); // Process input variables. myself.processArgs(args); responder.processEnvironmentVariables(); // Construct realm properties. String[] rproperties = new String[4]; rproperties = parseRealmProperties(myself.rname); myself.doit(rproperties, myself.reqQueueName, myself.respQueueName); } private void doit(String[] rproperties, String requestQueueName, String responseQueueName) { try { //Create session myself.constructSession(rproperties); //Connect to response queue. nChannelAttributes respAtr = new nChannelAttributes(); respAtr.setName(responseQueueName); respQueue = mySession.findQueue(respAtr); //Connect to request queue. nChannelAttributes reqAtr = new nChannelAttributes(); reqAtr.setName(requestQueueName); reqQueue = mySession.findQueue(reqAtr); setQueueReader(respQueue); if (async) { Console.WriteLine("Beginning to listen asynchronously..."); } else { //Set up a thread to process incoming synchronous events. Console.WriteLine("Beginning to listen synchronously..."); ThreadStart ts = new ThreadStart(run); Thread reader; reader = new Thread(ts); reader.IsBackground = true; reader.Start(); } Console.Read(); Console.WriteLine("Finished."); // Destroy the queue reader nQueue.destroyReader(queueReader); } catch (Exception ex) { ex.ToString(); } Environment.Exit(0); } // Set the appropriate type of queue listener. private void setQueueReader(nQueue respQueue) { try { if (async) { if (transactional) { Console.WriteLine("transational"); this.queueReader = reqQueue .createAsyncTransactionalReader(new nQueueReaderContext( this)); } else { this.queueReader = reqQueue .createAsyncTransactionalReader(new nQueueReaderContext( this)); } } else { if (transactional) { Console.WriteLine("transactional"); this.queueReader = reqQueue .createTransactionalReader(new nQueueReaderContext( this)); } else { this.queueReader = reqQueue .createTransactionalReader(new nQueueReaderContext( this)); } } } catch (Exception ) { Console.WriteLine("Could not create reader"); Environment.Exit(1); } } void processArgs(String[] args) { if (args.Length > 5) { Usage(); UsageEnv(); } else { switch (args.Length) { case 5: transactional = Convert.ToBoolean(args[4]); goto case 4; case 4: async = Convert.ToBoolean(args[3]); goto case 3; case 3: respQueueName = args[2]; goto case 2; case 2: reqQueueName = args[1]; goto case 1; case 1: if (args[0].Equals("-?")) { Usage(); UsageEnv(); } rname = args[0]; break; } } } private void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("nsubchan \n"); Console.WriteLine(" \n"); Console.WriteLine(" - Queue onto which request are published"); Console.WriteLine(" - Queue onto which responses are published"); Console.WriteLine(" - Channel name parameter for the channel to subscribe to"); Console.WriteLine("\n[Optional Arguments] \n"); Console.WriteLine("[asynchronous] - Whether to use asynchronous producing and consuming - true/false, default false."); Console.WriteLine("[transactional] - Whether to use transactional production and consumption of events - true/false, default false."); Console.WriteLine("\n\nNote: -? provides help on environment variables \n"); } public void go(nConsumeEvent req) { Console.WriteLine("Recieved request"); //Retrieve username of request sender. String requester = req.getPublishUser(); Console.WriteLine("Requester :" + requester); //Construct reply message. String text = "Response: " + encoding.GetString(req.getEventData()); Console.WriteLine("Reply:\"" + text + "\""); //Construct reply event nEventProperties atr = new nEventProperties(); nConsumeEvent resp = new nConsumeEvent("tag",encoding.GetBytes(text)); //Set recipient of the event to the requester's tag to reply. resp.setSubscriberName(encoding.GetBytes(requester)); try { if (transactional) { //Pack events transactionally if necessary (only one message in this simple example however). Console.WriteLine("Transactional"); List vEvents = new List(); nTransactionAttributes TXAttrib; try { TXAttrib = new nTransactionAttributes(respQueue, 1000); new nTransactionFactory(); nTransaction tx = nTransactionFactory.create(TXAttrib); vEvents.Add(resp); tx.publish(vEvents); tx.commit(); } catch (Exception ) { Console.WriteLine("Could not publish transaction."); Environment.Exit(1); } } else { //Otherwise simply publish the event. respQueue.push(resp); } } catch (Exception ) { Console.WriteLine("Could not publish to queue"); Environment.Exit(1); } Console.WriteLine("Published response"); } void run() { //Deal with synchronous events i.e. each client request. Console.WriteLine("Running Thread"); nConsumeEvent evt; while (true) { try { if (transactional) { evt = ((nQueueSyncTransactionReader) queueReader).pop(-1); } else { evt = ((nQueueSyncReader) queueReader).pop(-1); } if (evt != null) { go(evt); } } catch (Exception e) { Console.WriteLine("Exception in pop....exiting!"); Console.WriteLine(e.StackTrace); break; } } Environment.Exit(1); } } }