/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ package com.pcbsys.nirvana.nJMSApps; import com.pcbsys.foundation.utils.fEnvironment; import javax.jms.*; import javax.naming.Context; public class QBytesMessageSubscriber extends Subscriber { public QBytesMessageSubscriber(String factoryName, String destinationName) { super(factoryName, destinationName); } public static void main(String[] args) { System.out.println("START: QBytesMessageSubscriber"); //Check to see if args were specified if ((args.length == 1) && args[0].equals("-?")) { UsageEnv(); } final int min_args = 2; if (args.length < min_args) { Usage(); System.exit(1); } //Process Environment Variables processEnvironmentVariable("RNAME"); processEnvironmentVariable("PRINCIPAL"); processEnvironmentVariable("PASSWORD"); processEnvironmentVariable("CONTEXT_FACTORY"); processEnvironmentVariable("PROVIDER_URL"); processEnvironmentVariable("LOGLEVEL"); processEnvironmentVariable("HPROXY"); processEnvironmentVariable("HAUTH"); processEnvironmentVariable("CKEYSTORE"); processEnvironmentVariable("CKEYSTOREPASSWD"); processEnvironmentVariable("CAKEYSTORE"); processEnvironmentVariable("CAKEYSTOREPASSWD"); // Install any proxy server settings fEnvironment.setProxyEnvironments(); // Install any ssl settings fEnvironment.setSSLEnvironments(); String selector = null; int num_threads = 0; int maxrcv = 0; int argpos = 0; String factname = args[argpos++]; String destname = args[argpos++]; //Create an instance for this class QBytesMessageSubscriber subscriber = new QBytesMessageSubscriber(factname, destname); if (args.length > argpos) { num_threads = Integer.parseInt(args[argpos++]); } if (args.length > argpos) { subscriber.transacted = Boolean.parseBoolean(args[argpos++]); } if (args.length > argpos) { maxrcv = Integer.parseInt(args[argpos++]); } if (args.length > argpos) { selector = args[argpos++]; } if (args.length > argpos) { System.out.println("Excess arguments"); Usage(); System.exit(1); } System.out.println( "Mode=" + (num_threads == 0 ? "Async" : "Sync-" + num_threads) + ", Transacted=" + subscriber.transacted + ", Selector=" + selector); //Subscribe to the destination specified try { subscriber.doIt(factname, destname, num_threads, selector, maxrcv); } catch (Exception ex) { ex.printStackTrace(); } } private void doIt(String factoryName, String destName, int num_threads, String selector, int maxrcv) throws Exception { // get the initial context Context ctx = getInitialContext(); QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup(factoryName); System.out.println( "Sub: Context=" + ctx.getClass().getName() + ", ConnectionFactory=" + factoryName + "=>" + qcf.getClass() .getName()); // Create a Connection from the Connection Factory QueueConnection qc = qcf.createQueueConnection(); qc.setExceptionListener(this); // Create a Sesson from the Connection s = qc.createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE); // create the queue, and bind if necessary Queue q = (Queue) getDestination(ctx, s, destName); ctx.close(); System.out.println( "Sub: Connection=" + qc.getClass().getName() + ", Session=" + s.getClass().getName() + ", Dest=" + q.getClass() .getName()); if (num_threads == 0) { doItAsync(qc, q, selector); } else { doItSync(qc, q, selector, num_threads, maxrcv); } System.out.println("Sub: Closing connection"); qc.close(); s.close(); } /** * This method demonstrates the Nirvana JMS API calls necessary to subscribe to * a destination whether it is a topic or a queue. * It is called after all command line arguments have been received and * validated */ private void doItAsync(QueueConnection qc, Queue q, String selector) throws Exception { // Create a Subscriber from the Session MessageConsumer consumer = s.createConsumer(q, selector); System.out.println("Sub: Consumer=" + consumer.getClass().getName()); // Set the message listener to receive messages asynchronously consumer.setMessageListener(this); qc.start(); System.out.println("Ready - hit any key to exit"); System.in.read(); consumer.close(); } private void doItSync(QueueConnection qc, Queue q, String selector, int num_threads, int maxrcv) throws Exception { Thread[] threads = new Thread[num_threads]; for (int idx = 0; idx != num_threads; idx++) { SyncReceiver receiver = new SyncReceiver(idx + 1, s, q, selector, transacted, maxrcv); threads[idx] = new Thread(receiver); threads[idx].start(); } System.out.println("Sub: Created " + num_threads + " SyncReceivers with max-receive=" + maxrcv); qc.start(); for (int idx = 0; idx != num_threads; idx++) { threads[idx].join(); } } /** * A callback is received by the API to this method each time a message is received from * the destination. * * @param msg A JMS Message object */ @Override public void onMessage(Message msg) { count++; messageReceived("onMessage: Received msg #" + count, msg, s, transacted); } private static void messageReceived(String tag, Message msg, Session sess, boolean transacted) { String desc; try { desc = "ID=" + msg.getJMSMessageID() + ", Corr=" + msg.getJMSCorrelationID() + ", prop=" + msg .getIntProperty("MessageNumber"); } catch (Exception ex) { desc = ex.toString(); } System.out.println("Sub-" + tag + " - " + msg.getClass().getName() + " - " + desc); try { if (transacted) { sess.commit(); } } catch (Exception ex) { ex.printStackTrace(); } } /** * Prints the usage message for this class */ protected static void Usage() { System.out.println("Usage ...\n\n"); System.out.println("jmsbytesqsub \n"); System.out.println(" \n"); JMSClient.printFactoryNameUsage(); JMSClient.printDestinationNameUsage(); System.out.println(" - Zero for async mode, else the number of synchronous receiver threads"); System.out.println(" - Whether the session is transacted"); System.out.println(" - Max messages to receive in synchronous mode"); System.out.println(" - An optional message selector"); System.out.println("\n\nNote: -? provides help on environment variables \n"); } private static class SyncReceiver implements Runnable { private final String tag; private final Session sess; private final MessageConsumer consumer; private final boolean transacted; private final int maxrcv; public SyncReceiver(int num, Session s, Queue q, String selector, boolean t, int max) throws JMSException { tag = "Receiver-" + num; sess = s; consumer = sess.createConsumer(q, selector); transacted = t; maxrcv = max; } @Override public void run() { for (int msgcnt = 1; maxrcv == 0 || msgcnt <= maxrcv; msgcnt++) { try { Message msg = consumer.receive(); messageReceived(tag + ": Received msg #" + msgcnt, msg, sess, transacted); } catch (Exception ex) { ex.printStackTrace(); } } try { consumer.close(); } catch (Exception ex) { ex.printStackTrace(); } } } }