/* Copyright 1999-2011 (c) My-Channels Copyright (c) 2012-2014 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. */ package com.pcbsys.nirvana.apps; import com.pcbsys.nirvana.client.*; import java.util.Vector; /** * Publishes transactionally to a nirvana channel */ public class txpublish extends nSampleApp { private static txpublish mySelf = null; /** * This method demonstrates the Nirvana API calls necessary to publish to a * channel transactionally. It is called after all command line arguments * have been received and validated * * @param realmDetails * a String[] containing the possible RNAME values * @param achannelName * the channel name to publish to * @param count * the number of messages to publish * @param size * the size (bytes) of each message to be published * @param txSize * the number of messages to be published in each transaction */ private void doit(String[] realmDetails, String achannelName, int count, int size, int txSize) { mySelf.constructSession(realmDetails); // Publishes to the specified channel try { // Create a channel attributes object nChannelAttributes nca = new nChannelAttributes(); nca.setName(achannelName); // Obtain a reference to the channel nChannel myChannel = mySession.findChannel(nca); // Create a byte array filled with characters equal to // the message size specified. This could be a result // of String.getBytes() call in a real world scenario. byte[] buffer = new byte[size]; for (int x = 0; x < size; x++) { buffer[x] = (byte) ((x % 90) + 32); } // Create a vector that contains the messages to be published on // each // transactional publish call Vector vevents = new Vector(); for (int x = 0; x < txSize; x++) { nEventProperties props = new nEventProperties(); nConsumeEvent evt = new nConsumeEvent(props, new String("TXID-" + x).getBytes()); // Note the Tag does NOT need the TX vevents.addElement(evt); } // Inform the user that publishing is about to start System.out.println("Starting publish of " + count + " events of size " + size); // Obtain the channel's last event ID prior to us publishing // anything long startEid = myChannel.getLastEID(); // Get a timestamp to be used to calculate the message publishing // rates long start = System.currentTimeMillis(); // Create a transaction attributes object based on the channel we // wish to publish to nTransactionAttributes TXAttrib = new nTransactionAttributes( myChannel, 1000); // Define a transaction object nTransaction tx = null; // Loop as many times as the number of messages we want to publish // divided by the number of events per transaction for (int x = 0; x < ((int) (count / txSize)); x++) { try { // Create a new transaction object tx = nTransactionFactory.create(TXAttrib); // Publish the vector of messages create previously tx.publish(vevents); // Commit the transaction tx.commit(); } catch (nSessionNotConnectedException ex) { while (!mySession.isConnected()) { System.out .println("Disconnected from Nirvana, Sleeping for 1 second..."); try { Thread.sleep(1000); } catch (InterruptedException e) { } } if (tx != null) { try { tx.commit(); System.out.println("Commited tx after failover..."); } catch (Exception e) { System.out .println("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event.."); x--; } } else { System.out .println("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event.."); x--; } } } // Calculate the actual number of events published by obtaining // the channel's last eid after our publishing and subtracting // the channel's last eid before our publishing. // This also ensures that all client queues have been flushed. long events = myChannel.getLastEID() - startEid; // Get a timestamp to calculate the publishing rates long end = System.currentTimeMillis(); // Calculate the events / sec rate long eventPerSec = (((events) * 1000) / ((end + 1) - start)); // Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; // Inform the user of the resulting rates System.out.println("Events = " + events + " Events/sec = " + eventPerSec + " Bytes/Sec = " + bytesPerSec); } // Handle errors catch (nChannelNotFoundException cnfe) { System.out.println("The channel specified could not be found."); System.out .println("Please ensure that the channel exists in the REALM you connect to."); cnfe.printStackTrace(); System.exit(1); } catch (nSecurityException se) { System.out .println("Insufficient permissions for the requested operation."); System.out.println("Please check the ACL settings on the server."); se.printStackTrace(); System.exit(1); } catch (nSessionNotConnectedException snce) { System.out .println("The session object used is not physically connected to the Nirvana realm."); System.out .println("Please ensure the realm is up and check your RNAME value."); snce.printStackTrace(); System.exit(1); } catch (nUnexpectedResponseException ure) { System.out .println("The Nirvana REALM has returned an unexpected response."); System.out .println("Please ensure the Nirvana REALM and client API used are compatible."); ure.printStackTrace(); System.exit(1); } catch (nUnknownRemoteRealmException urre) { System.out .println("The channel specified resided in a remote realm which could not be found."); System.out .println("Please ensure the channel name specified is correct."); urre.printStackTrace(); System.exit(1); } catch (nRequestTimedOutException rtoe) { System.out .println("The requested operation has timed out waiting for a response from the REALM."); System.out .println("If this is a very busy REALM ask your administrator to increase the client timeout values."); rtoe.printStackTrace(); System.exit(1); } catch (nTransactionAlreadyCommittedException tace) { System.out .println("The transaction you attempted to commit has already been commited on this REALM."); tace.printStackTrace(); System.exit(1); } catch (nTransactionNotStartedException tnse) { System.out .println("The transaction you attempted to commit has not yet started on this REALM."); tnse.printStackTrace(); System.exit(1); } catch (nBaseClientException nbce) { System.out .println("An error occured while creating the Channel Attributes object."); nbce.printStackTrace(); System.exit(1); } // Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ex) { } // Close any other sessions within this JVM so that we can exit nSessionFactory.shutdown(); } protected void processArgs(String[] args) { switch (args.length) { case 4: System.getProperties().put("TXSIZE", args[3]); case 3: System.getProperties().put("SIZE", args[2]); case 2: System.getProperties().put("COUNT", args[1]); case 1: if (args[0].equals("-?")) { Usage(); UsageEnv(); } System.getProperties().put("CHANNAME", args[0]); break; } } public static void main(String[] args) { // Create an instance for this class mySelf = new txpublish(); // Process command line arguments mySelf.processArgs(args); // Process Environment Variables nSampleApp.processEnvironmentVariables(); // Check the channel name specified String channelName = null; if (System.getProperty("CHANNAME") != null) channelName = System.getProperty("CHANNAME"); else { Usage(); System.exit(1); } int count = 10; // Default value // Check if the number of messages to be published was specified if (System.getProperty("COUNT") != null) { try { count = Integer.parseInt(System.getProperty("COUNT")); } catch (Exception num) { } // Ignore and use the defaults } int size = 100; // Default value (bytes) // Check if the size (bytes) of each message has been specified if (System.getProperty("SIZE") != null) { try { size = Integer.parseInt(System.getProperty("SIZE")); } catch (Exception num) { } // Ignore and use the default } int txSize = 1; // Default value // Check if the number of messages per transaction has been specified if (System.getProperty("TXSIZE") != null) { try { txSize = Integer.parseInt(System.getProperty("TXSIZE")); } catch (Exception num) { } // Ignore and use the default } // Check the local realm details String RNAME = null; if (System.getProperty("RNAME") != null) RNAME = System.getProperty("RNAME"); else { Usage(); System.exit(1); } // Process the local REALM RNAME details String[] rproperties = new String[4]; rproperties = parseRealmProperties(RNAME); // Publish to the channel specified mySelf.doit(rproperties, channelName, count, size, txSize); } /** * Prints the usage message for this class */ private static void Usage() { System.out.println("Usage ...\n"); System.out .println("npubtxchan [count] [size] [tx size] \n"); System.out.println(" \n"); System.out .println(" - Channel name parameter for the channel to publish to"); System.out.println("\n[Optional Arguments] \n"); System.out .println("[count] -The number of events to publish (default: 10)"); System.out .println("[size] - The size (bytes) of the event to publish (default: 100)"); System.out .println("[tx size] - The number of events per transaction (default: 1)"); System.out .println("\n\nNote: -? provides help on environment variables \n"); } } // End of txpublish Class