/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ package com.pcbsys.nirvana.apps; import com.pcbsys.nirvana.client.*; import java.util.*; public class RegisteredEvent extends nSampleApp implements nEventListener { /** * Demonstrates the use of Registered events for channels */ private static RegisteredEvent mySelf = null; /** * This method demonstrates the Nirvana API calls necessary to publish and consume registered events on * a channel. * It is called after all command line arguments have been received * and validated * * @param realmDetails a String[] containing the possible RNAME values * @param achannelName the channel name to publish to * @param count the number of messages to publish */ private void doit(String[] realmDetails, String achannelName, int count) throws nBaseClientException { mySelf.constructSession(realmDetails); try { nChannelAttributes nca = new nChannelAttributes(); nChannel myChannel = null; try { nca.setName(achannelName); myChannel = mySession.findChannel(nca); } catch (nIllegalArgumentException e1) { System.out.println("Channel name not recognised."); e1.printStackTrace(); } catch (nUnknownRemoteRealmException e) { System.out.println("Realm not recognised."); e.printStackTrace(); } catch (nIllegalChannelMode e) { System.out.println("Invalid channel mode."); e.printStackTrace(); } catch (nChannelNotFoundException ex) { nChannelAttributes cattr; try { cattr = new nChannelAttributes(achannelName, 0, 0, nChannelAttributes.SIMPLE_TYPE); cattr.useMergeEngine(true); nChannelPublishKeys[] pks = new nChannelPublishKeys[1]; pks[0] = new nChannelPublishKeys("ccy", 1); cattr.setPublishKeys(pks); myChannel = mySession.createChannel(cattr); } catch (nIllegalArgumentException e) { System.out.println("Channel attributes not recongised"); e.printStackTrace(); } catch (nUnknownRemoteRealmException e) { System.out.println("Realm could not be found"); e.printStackTrace(); } catch (nChannelAlreadyExistsException e) { System.out.println("Channel already exists."); e.printStackTrace(); } } // subscribe to channel try { myChannel.addSubscriber(this); } catch (nIllegalArgumentException e1) { System.out.println("Invalid channel subscriber."); e1.printStackTrace(); } catch (nChannelNotFoundException e1) { System.out.println("Could not find channel."); e1.printStackTrace(); } catch (nChannelAlreadySubscribedException e1) { System.out.println("CAlready subscribed to channel"); e1.printStackTrace(); } System.out.println("Creating and registering events"); nRegisteredEvent aud = null; nRegisteredEvent gbp = null; nRegisteredEvent eur = null; try { aud = myChannel.createRegisteredEvent("AUD"); nEventProperties props = aud.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("description", "Australian Dollar"); props.put("time", (byte) System.nanoTime()); gbp = myChannel.createRegisteredEvent((Object) "GBP"); props = gbp.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("description", "English Pound"); props.put("time", (byte) System.nanoTime()); eur = myChannel.createRegisteredEvent((Object) "EUR"); props = eur.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("description", "Euro"); props.put("time", (byte) System.nanoTime()); // The events are now registered with the channel so commit them // to the server try { aud.commitChanges(true); gbp.commitChanges(true); eur.commitChanges(true); } catch (nTransactionException e) { System.out.println("Could not commit events"); e.printStackTrace(); } catch (nChannelNotFoundException e) { System.out.println("Channel could not be found."); e.printStackTrace(); } } catch (nIllegalArgumentException e1) { e1.printStackTrace(); } System.out.println("All events are now registered and committed to the server"); // Loop as many times as the number of messages we want to publish int published = 0; while (published < count) { try { // Both name and bid will be sent to the server as we readd them on each iteration. // The server will see that these properties have not changed so will not send the // events to the subscriptions aud.getProperties().put("bid", 0.7999); aud.getProperties().put("name", "James"); Thread.sleep(2000); aud.commitChanges(); // On the first iteration the consumer will receive all properties of the event because // each value has changed. On subsequent iterations, the bid and offer do not change // so the consumer will only receive the 'time' property gbp.getProperties().put("offer", 1.567); gbp.getProperties().put("bid", 1.67888); gbp.getProperties().put("time", new Date().toString()); gbp.commitChanges(); Thread.sleep(2000); //Because 'true' is passed to commitChanges, the current event registered on the server //will be overwritten with this event. Therefore the consumer will receive all properties //on each iteration. eur.getProperties().put("offer", 1.567); eur.getProperties().put("bid", 1.67888); eur.getProperties().put("time", new Date().toString()); eur.commitChanges(true);//Overwrites the current event on the server Thread.sleep(2000); published++; } catch (nSessionNotConnectedException ex) { while (!mySession.isConnected()) { System.out.println("Disconnected from Nirvana, Sleeping for 1 second..."); try { wait(1000); } catch (Exception e) { } } } catch (nBaseClientException ex) { System.out.println("Publish.cs : Exception : " + ex.toString()); throw ex; } catch (InterruptedException e) { System.out.println("Thread Interrpupted"); e.printStackTrace(); } } } catch (nSessionPausedException ps) { System.out.println("Session has been paused, please resume the session"); System.exit(1); } catch (nSecurityException se) { System.out.println("Insufficient permissions for the requested operation."); System.out.println("Please check the ACL settings on the server."); System.exit(1); } catch (nSessionNotConnectedException snce) { System.out.println("The session object used is not physically connected to the Nirvana realm."); System.out.println("Please ensure the realm is up and check your RNAME value."); System.exit(1); } catch (nUnexpectedResponseException ure) { System.out.println("The Nirvana REALM has returned an unexpected response."); System.out.println("Please ensure the Nirvana REALM and client API used are compatible."); System.exit(1); } catch (nRequestTimedOutException rtoe) { System.out.println("The requested operation has timed out waiting for a response from the REALM."); System.out.println("If this is a very busy REALM ask your administrator to increase the client timeout values."); System.exit(1); } // Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ex) { } // Close any other sessions so that we can exit nSessionFactory.shutdown(); } public void go(nConsumeEvent evt) { System.out.println("Channel Name : " + evt.getChannelName()); System.out.println("Event id : " + evt.getEventID()); System.out.println("Event tag : " + evt.getEventTag()); System.out.println("Is Delta : " + evt.getAttributes().isDelta()); // Print the message data if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } protected void processArgs(String[] args) { // Need a min of 2, rname, channel name if (args.length < 2) { Usage(); System.exit(2); } String RNAME = args[0]; String channelName = args[1]; int count = 1000; if (args.length > 2) { count = Integer.parseInt(args[2]); } // // Run the sample app // try { mySelf.doit(parseRealmProperties(RNAME), channelName, count); } catch (nBaseClientException e) { e.printStackTrace(); } } public static void main(String[] args) { // Create an instance for this class mySelf = new RegisteredEvent(); // Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { System.out.println("Usage ...\n"); System.out.println("RegisteredEvent [count] [size] \n"); System.out.println(" \n"); System.out.println(" - the rname of the server to connect to"); System.out.println(" - Channel name parameter for the channel to publish to"); System.out.println("\n[Optional Arguments] \n"); System.out.println("[count] -The number of events to publish (default: 10)"); } }