/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Threading; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { /** * Demonstrates the use of Registered events for channels */ class RegisteredEvent : nSampleApp, nEventListener { private bool isOk = true; private nBaseClientException asyncException = new nBaseClientException("Asynchronous exception received"); private static RegisteredEvent mySelf = null; /** * This method demonstrates the Nirvana API calls necessary to publish and consume registered events on * a channel. * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param achannelName the channel name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published */ private void doit(String[] realmDetails, String achannelName, int count, int size) { mySelf.constructSession(realmDetails); try { nChannelAttributes nca = new nChannelAttributes(); nChannel myChannel = null; nca.setName(achannelName); try { myChannel = mySession.findChannel(nca); } catch (nChannelNotFoundException ) { nChannelAttributes cattr = new nChannelAttributes(achannelName, 0, 0, nChannelAttributes.SIMPLE_TYPE); cattr.useMergeEngine(true); nChannelPublishKeys[] pks = new nChannelPublishKeys[1]; pks[0] = new nChannelPublishKeys("ccy", 1); cattr.setPublishKeys(pks); myChannel = mySession.createChannel(cattr); } // add the subscriber myChannel.addSubscriber(this); Console.WriteLine("Creating and registering events"); nRegisteredEvent aud = myChannel.createRegisteredEvent("AUD"); nEventProperties props = aud.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("description", "Australian Dollar"); props.put("time", DateTime.Now.ToLongTimeString()); nRegisteredEvent gbp = myChannel.createRegisteredEvent("GBP"); props = gbp.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("description", "English Pound"); props.put("time", DateTime.Now.ToLongTimeString()); nRegisteredEvent eur = myChannel.createRegisteredEvent("EUR"); props = eur.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("description", "Euro"); props.put("time", DateTime.Now.ToLongTimeString()); // The events are now registered with the channel so commit them to the server aud.commitChanges(true); gbp.commitChanges(true); eur.commitChanges(true); Console.WriteLine("All events are now registered and committed to the server"); //Loop as many times as the number of messages we want to publish int published = 0; while (published < count) { try { // Both name and bid will be sent to the server as we readd them on each iteration. // The server will see that these properties have not changed so will not send the // events to the subscriptions aud.getProperties().put("bid", 0.7999); aud.getProperties().put("name", "James"); Thread.Sleep(2000); aud.commitChanges(); // On the first iteration the consumer will receive all properties of the event because // each value has changed. On subsequent iterations, the bid and offer do not change // so the consumer will only receive the 'time' property gbp.getProperties().put("offer", 1.567); gbp.getProperties().put("bid", 1.67888); gbp.getProperties().put("time", DateTime.Now.ToLongTimeString()); gbp.commitChanges(); Thread.Sleep(2000); //Because 'true' is passed to commitChanges, the current event registered on the server //will be overwritten with this event. Therefore the consumer will receive all properties //on each iteration. eur.getProperties().put("offer", 1.567); eur.getProperties().put("bid", 1.67888); eur.getProperties().put("time", DateTime.Now.ToLongTimeString()); eur.commitChanges(true);//Overwrites the current event on the server Thread.Sleep(2000); published++; } catch (nSessionNotConnectedException ) { while (!mySession.isConnected()) { Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second..."); try { Thread.Sleep(1000); } catch (Exception ) { } } //We need to repeat the publish for the event publish that caused the exception, // so we reduce the counter } catch (nBaseClientException ex) { Console.WriteLine("Publish.cs : Exception : " + ex.Message); throw ex; } //Check if an asynchronous exception has been received if (!isOk) { //If it has, then throw it throw asyncException; } } } catch (nSessionPausedException ) { Console.WriteLine("Session has been paused, please resume the session"); Environment.Exit(1); } catch (nSecurityException ) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Environment.Exit(1); } catch (nSessionNotConnectedException ) { Console.WriteLine("The session object used is not physically connected to the Nirvana realm."); Console.WriteLine("Please ensure the realm is up and check your RNAME value."); Environment.Exit(1); } catch (nUnexpectedResponseException ) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Environment.Exit(1); } catch (nRequestTimedOutException ) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values."); Environment.Exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } public void go(nConsumeEvent evt) { Console.WriteLine("Channel Name : " + evt.getChannelName()); Console.WriteLine("Event id : " + evt.getEventID()); Console.WriteLine("Event tag : " + evt.getEventTag()); Console.WriteLine("Is Delta : " + evt.getAttributes().isDelta()); //Print the message data if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } } protected override void processArgs(String[] args) { // // Need a min of 2, rname, channel name if (args.Length < 2) { Usage(); Environment.Exit(2); } String RNAME = args[0]; ; var channelName = args[1]; int count = 1000; if (args.Length > 2) { count = Convert.ToInt32(args[2]); } int size = 1024; if (args.Length > 3) { size = Convert.ToInt32(args[3]); } // // Run the sample app // mySelf.doit(parseRealmProperties(RNAME), channelName, count, size); } public static void Main(String[] args) { //Create an instance for this class mySelf = new RegisteredEvent(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("RegisteredEvent [count] [size] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " - Channel name parameter for the channel to publish to"); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[count] -The number of events to publish (default: 10)"); Console.WriteLine( "[size] - The size (bytes) of the event to publish (default: 100)"); } } }