/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ package com.pcbsys.nirvana.apps; import java.util.Vector; import com.pcbsys.nirvana.client.*; public class responder extends nSampleApp implements nEventListener, Runnable { private static responder myself; private nQueue respQueue; private nQueue reqQueue; private boolean async = false; private boolean transactional = false; private nQueueReader queueReader = null; public static void main(String[] args) { myself = new responder(); // Process input variables. myself.processArgs(args); responder.processEnvironmentVariables(); String reqQueueName = System.getProperty("REQUESTQUEUENAME"); String respQueueName = System.getProperty("RESPONSEQUEUENAME"); if (System.getProperty("ASYNC") != null) { myself.async = Boolean.valueOf(System.getProperty("ASYNC")); } if (System.getProperty("TRANSACTIONAL") != null) { myself.transactional = Boolean.valueOf(System.getProperty("TRANSACTIONAL")); } // Construct realm properties. String[] rproperties = new String[4]; rproperties = parseRealmProperties(System.getProperty("RNAME")); myself.doit(rproperties, reqQueueName, respQueueName); } private void doit(String[] rproperties, String requestQueueName, String responseQueueName) { try { //Create session myself.constructSession(rproperties); //Connect to response queue. nChannelAttributes respAtr = new nChannelAttributes(); respAtr.setName(responseQueueName); respQueue = mySession.findQueue(respAtr); //Connect to request queue. nChannelAttributes reqAtr = new nChannelAttributes(); reqAtr.setName(requestQueueName); reqQueue = mySession.findQueue(reqAtr); setQueueReader(respQueue); if (async) { System.out.println("Beginning to listen asynchronously..."); } else { //Set up a thread to process incoming synchronous events. System.out.println("Beginning to listen synchronously..."); Thread reader = new Thread(this); reader.setDaemon(true); reader.start(); } System.in.read(); System.out.println("Finished."); // Destroy the queue reader nQueue.destroyReader(queueReader); } catch (Exception ex) { ex.printStackTrace(); } System.exit(0); } // Set the appropriate type of queue listener. private void setQueueReader(nQueue respQueue) { try { if (async) { if (transactional) { System.out.println("transational"); this.queueReader = reqQueue.createAsyncTransactionalReader(new nQueueReaderContext(this)); } else { this.queueReader = reqQueue.createAsyncTransactionalReader(new nQueueReaderContext(this)); } } else { if (transactional) { System.out.println("transactional"); this.queueReader = reqQueue.createTransactionalReader(new nQueueReaderContext(this)); } else { this.queueReader = reqQueue.createTransactionalReader(new nQueueReaderContext(this)); } } } catch (Exception ex) { System.out.println("Could not create reader"); System.exit(1); } } @Override protected void processArgs(String[] args) { if (args.length > 5) { Usage(); UsageEnv(); } else { switch (args.length) { case 4: System.getProperties().put("TRANSACTIONAL", args[3]); case 3: System.getProperties().put("ASYNC", args[2]); case 2: System.getProperties().put("RESPONSEQUEUENAME", args[1]); case 1: if (args[0].equals("-?")) { Usage(); UsageEnv(); } System.getProperties().put("REQUESTQUEUENAME", args[0]); } } } private void Usage() { System.out.println("Usage ...\n"); System.out.println("responder \n"); System.out.println(" \n"); System.out.println(" - Queue onto which request are published"); System.out.println(" - Queue onto which responses are published"); System.out.println("\n[Optional Arguments] \n"); System.out .println("[asynchronous] - Whether to use asynchronous producing and consuming - true/false, default false."); System.out.println( "[transactional] - Whether to use transactional production and consumption of events - true/false, default false."); System.out.println("\n\nNote: -? provides help on environment variables \n"); } public void go(nConsumeEvent req) { System.out.println("Recieved request"); //Retrieve username of request sender. String requester = req.getPublishUser(); System.out.println("Requester :" + requester); //Construct reply message. String text = "Response: " + new String(req.getEventData()); System.out.println("Reply:\"" + text + "\""); //Construct reply event nEventProperties atr = new nEventProperties(); nConsumeEvent resp = new nConsumeEvent(atr, text.getBytes()); //Set recipient of the event to the requester's tag to reply. resp.setSubscriberName(requester.getBytes()); try { if (transactional) { //Pack events transactionally if necessary (only one message in this simple example however). System.out.println("Transactional"); Vector vEvents = new Vector(); nTransactionAttributes TXAttrib; try { TXAttrib = new nTransactionAttributes(respQueue, 1000); new nTransactionFactory(); nTransaction tx = nTransactionFactory.create(TXAttrib); vEvents.addElement(resp); tx.publish(vEvents); tx.commit(); } catch (Exception e) { System.out.println("Could not publish transaction."); System.exit(1); } } else { //Otherwise simply publish the event. respQueue.push(resp); } } catch (Exception e) { System.out.println("Could not publish to queue"); System.exit(1); } System.out.println("Published response"); } public void run() { //Deal with synchronous events i.e. each client request. System.out.println("Running Thread"); nConsumeEvent evt; while (true) { try { if (transactional) { evt = ((nQueueSyncTransactionReader) queueReader).pop(-1); } else { evt = ((nQueueSyncReader) queueReader).pop(-1); } if (evt != null) { go(evt); } } catch (Exception e) { System.out.println("Exception in pop....exiting!"); e.printStackTrace(); break; } } System.exit(1); } }