/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ package com.pcbsys.nirvana.apps; import com.pcbsys.nirvana.client.*; import java.util.*; /** * nDataGroup publisher sample application * Will create a group specified by the parameter passed on the command line. All known data stream connections (except * itself) will * then be added to the created group when callback is received for new stream connections. When no stream connections * exist, publishing stops. * When streams exist, publishing to the created group commences. */ public class DataGroupPublisher extends nSampleApp implements nDataGroupListener { private boolean isOk = true; private nBaseClientException asyncException; private nDataGroup myGroup = null; private volatile boolean hasStreams = false; private Object mutex = new Object(); private static DataGroupPublisher mySelf = null; /** * This method demonstrates the Nirvana API calls necessary to publish to * an nDataGroup. * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param groupName the data group name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published */ private void doit(String[] realmDetails, String groupName, boolean isMulticast, boolean isConflated, int count, int size, int conflationType, int conflationInterval) throws Exception { mySelf.constructSession(realmDetails); try { nDataGroup[] grps = mySession.getDataGroups(this); Thread.sleep(100); // if group not found, create it if (myGroup == null) { if (isConflated) { //Create a conflation attributes based on given parameters nConflationAttributes conflationAttributes = new nConflationAttributes(conflationType, conflationInterval); myGroup = mySession.createDataGroup(groupName, this, conflationAttributes, isMulticast, true); } else { //Create a standard data group myGroup = mySession.createDataGroup(groupName, isMulticast); } } // get the default group to determine how many stream users are connected nDataGroup defaultGroup = mySession.getDefaultDataGroup(); // just add each connected stream to the group for (Iterator i = defaultGroup.getStreams(); i.hasNext(); ) { nDataStream stream = i.next(); if (!stream.getName().equals(mySession.getStreamId())) { myGroup.add(stream); } } hasStreams = myGroup.size() > 0; //Create a byte array filled with characters equal to // the message size specified. This could be a result //of String.getBytes() call in a real world scenario. byte[] buffer = new byte[size]; for (int x = 0; x < size; x++) { buffer[x] = (byte) ((x % 90) + 32); } //Inform the user that publishing is about to start System.out.println("Starting publish of " + count + " events with a size of " + size + " bytes each"); //Get a timestamp to be used to calculate the message publishing rates long start = System.currentTimeMillis(); //Loop as many times as the number of messages we want to publish for (int x = 0; x < count; x++) { while (!hasStreams) { synchronized (mutex) { System.out.println("No streams are registered, waiting for notification"); mutex.wait(); if (hasStreams) { System.out.println("Streams are registered starting to publish"); } } } try { //Instantiate the message to be published with the specified nEventPropeties / byte[] //nEventProperties props = new nEventProperties(); //You can add other types in a dictionary object //props.put("key0string"+x, "1"+x); //props.put("key1int", (int) 1); //props.put("key2long", (long) -11); nConsumeEvent evt1 = new nConsumeEvent((byte[]) null, buffer); //Publish the event mySession.writeDataGroup(evt1, myGroup); } catch (nSessionNotConnectedException ex) { while (!mySession.isConnected()) { System.out.println("Disconnected from Nirvana, Sleeping for 1 second..."); try { Thread.sleep(1000); } catch (Exception e) { } } x--; //We need to repeat the publish for the event publish that caused the exception, //so we reduce the counter } catch (nBaseClientException ex) { System.out.println("Publish.cs : Exception : " + ex.getMessage()); throw ex; } //Check if an asynchronous exception has been received if (!isOk) { //If it has, then throw it throw asyncException; } } //Check if an asynchronous exception has been received if (!isOk) { //If it has, then throw it throw asyncException; } long end = System.currentTimeMillis(); //Get a timestamp to calculate the publishing rates //Calculate the events / sec rate long dif = end - start; long eventPerSec = count; eventPerSec = (eventPerSec * 1000) / dif; //Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates System.out.println("Publish Completed in " + dif + "ms :"); System.out.println( "[Events Published = " + count + "] [Events/Second = " + eventPerSec + "] [Bytes/Second = " + bytesPerSec + "]"); System.out.println("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession .getInputByteCount() + "]"); } catch (nSessionPausedException ps) { System.out.println("Session has been paused, please resume the session"); System.exit(1); } catch (nSecurityException se) { System.out.println("Insufficient permissions for the requested operation."); System.out.println("Please check the ACL settings on the server."); System.exit(1); } catch (nSessionNotConnectedException snce) { System.out.println("The session object used is not physically connected to the Nirvana realm."); System.out.println("Please ensure the realm is up and check your RNAME value."); System.exit(1); } catch (nUnexpectedResponseException ure) { System.out.println("The Nirvana REALM has returned an unexpected response."); System.out.println("Please ensure the Nirvana REALM and client API used are compatible."); System.exit(1); } catch (nRequestTimedOutException rtoe) { System.out.println("The requested operation has timed out waiting for a response from the REALM."); System.out.println("If this is a very busy REALM ask your administrator to increase the client timeout values."); System.exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ex) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } // callback for DataStreamListener, not used since the publisher is not added to any user defined data group public void onMessage(nConsumeEvent evt) { } // new stream added to default data group public void addedStream(nDataGroup group, nDataStream stream, int count) { System.out.println( group.getName() + " : New stream " + stream.getName() + " : " + stream.getSubject() + " added, size now " + count); if (group.getName().equals(myGroup.getName()) && count > 0) { hasStreams = true; System.out.println(group.getName() + " : " + count + " Streams are registered, starting publisher thread"); synchronized (mutex) { mutex.notifyAll(); } } else { if (!stream.getSubject().contains("admin") && !stream.getName().equals(mySession.getStreamId())) { try { myGroup.add(stream); } catch (Exception e) { e.printStackTrace(); } } } } // stream removed from data group public void deletedStream(nDataGroup group, nDataStream stream, int count, boolean serverRemoved) { System.out.println( group.getName() + " : Data stream " + stream.getName() + " : " + stream.getSubject() + " removed, size now " + count); if (group.getName().equals(myGroup.getName()) && count == 0) { hasStreams = false; System.out.println("No streams are registered, stopping publisher thread"); } else if (group.getName().equals(myGroup.getName()) && count > 0) { hasStreams = true; System.out.println(group.getName() + " : " + count + " Streams are registered, starting publisher thread"); } } // new data group added public void addedGroup(nDataGroup to, nDataGroup group, int count) { System.out.println(to.getName() + " : Data group " + group.getName() + " added, size now " + count); } // new data group created public void createdGroup(nDataGroup group) { System.out.println("Data group " + group.getName() + " created"); } // data group deleted public void deletedGroup(nDataGroup group) { System.out.println("Data group " + group.getName() + " deleted"); } // data group removed public void removedGroup(nDataGroup from, nDataGroup group, int count) { System.out.println(from.getName() + " : Data group " + group.getName() + " removed, size now " + count); } protected void processArgs(String[] args) { try { // Process Environment Variables nSampleApp.processEnvironmentVariables(); // Need a min of 1, group name if (args.length < 2) { Usage(); System.exit(2); } String groupName = args[0]; int count = 1000; if (args.length > 1) { count = Integer.parseInt(args[1]); } int size = 1024; if (args.length > 2) { size = Integer.parseInt(args[2]); } boolean enableMulticast = true; if (args.length > 3) { enableMulticast = Boolean.parseBoolean(args[3]); } boolean enableConflation = false; if (args.length > 4) { enableConflation = Boolean.parseBoolean(args[4]); } int conflationType = nConflationAttributes.sMergeEvents; if (args.length > 5) { if (args[5].equals("merge")) { conflationType = nConflationAttributes.sMergeEvents; } else if (args[5].equals("drop")) { conflationType = nConflationAttributes.sDropEvents; } } int conflationInterval = 500; if (args.length > 6) { conflationInterval = Integer.parseInt(args[6]); } // Check the local realm details String RNAME = null; if (System.getProperty("RNAME") != null) { RNAME = System.getProperty("RNAME"); } else { Usage(); System.exit(1); } // // Run the sample app // mySelf .doit(parseRealmProperties(RNAME), groupName, enableMulticast, enableConflation, count, size, conflationType, conflationInterval); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { //Create an instance for this class mySelf = new DataGroupPublisher(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { System.out.println("Usage ...\n"); System.out.println( "DataGroupPublish [count] [size] [enable multicast] [conflate] [conflation merge or drop] [conflation interval \n"); System.out.println(" \n"); System.out.println(" - Data group name parameter to publish to"); System.out.println("\n[Optional Arguments] \n"); System.out.println("[count] -The number of events to publish (default: 10)"); System.out.println("[size] - The size (bytes) of the event to publish (default: 100)"); System.out.println("[enable multicast] - enable the data group for multicast delivery"); System.out.println("[conflate] - enable conflation true or false"); System.out.println("[conflation merge or drop] - merge to enable merge or drop to enable drop (default: merge)"); System.out.println("[conflation interval] - the interval for conflation to publish(default: 500"); } }