/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Threading; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { /** * Writes to a nirvana data group using registered events. */ class DataGroupDeltaDelivery : nSampleApp, nDataStreamListener { private bool isOk = true; private nBaseClientException asyncException = new nBaseClientException("ASync Exception"); private static DataGroupDeltaDelivery mySelf; /** * This method demonstrates the Nirvana API calls necessary to write to a * data group. Each time a message is written, only those properties which * have changed will be sent to the server. It is called after all command * line arguments have been received and validated. * * @param realmDetails a String[] containing the possible RNAME values * @param count the number of iterations */ private void doit(String[] realmDetails, int count) { //Construct a session and pass this as the data stream listener mySelf.constructSession(realmDetails, this); try { //Create a data group nDataGroup aud_dg = mySession.createDataGroup("AUDGroup"); nDataGroup gbp_dg = mySession.createDataGroup("GBPGroup"); nDataGroup eur_dg = mySession.createDataGroup("EURGroup"); //add our stream to the groups so that we receive callbacks when an event is received aud_dg.add(myStream); gbp_dg.add(myStream); eur_dg.add(myStream); Console.WriteLine("Creating and registering events"); nRegisteredEvent aud = aud_dg.createRegisteredEvent(); nEventProperties props = aud.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("desc", "Australian Dollar"); props.put("time", DateTime.Now.ToShortTimeString()); nRegisteredEvent gbp = gbp_dg.createRegisteredEvent(); props = gbp.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("desc", "English Pound"); props.put("time", DateTime.Now.ToShortTimeString()); nRegisteredEvent eur = eur_dg.createRegisteredEvent(); props = eur.getProperties(); props.put("bid", 0.8999); props.put("offer", 0.9999); props.put("desc", "Euro"); props.put("time", DateTime.Now.ToShortTimeString()); // The events are now registered with the channel so commit them to the server // True is passed as a parameter to ensure we reset the event on the server aud.commitChanges(true); gbp.commitChanges(true); eur.commitChanges(true); Console.WriteLine("All events are now registered and committed to the server"); //Loop as many times as the number of messages we want to publish for (int i = 0; i < count;i++ ) { try { // Both name and bid will be sent to the server as we readd them on each iteration. // The server will see that these properties have not changed so will not send the // events to the subscriptions aud.getProperties().put("bid", 0.7999); aud.getProperties().put("name", "James"); Thread.Sleep(2000); aud.commitChanges(); // On the first iteration the consumer will receive all properties of the event because // each value has changed. On subsequent iterations, the bid and offer do not change // so the consumer will only receive the 'time' property gbp.getProperties().put("offer", 1.567); gbp.getProperties().put("bid", 1.67888); gbp.getProperties().put("time", DateTime.Now.ToLongTimeString()); gbp.commitChanges(); Thread.Sleep(2000); //Because 'true' is passed to commitChanges, the current event registered on the server //will be overwritten with this event. Therefore the consumer will receive all properties //on each iteration. eur.getProperties().put("offer", 1.567); eur.getProperties().put("bid", 1.67888); eur.getProperties().put("time", DateTime.Now.ToLongTimeString()); eur.commitChanges(true);//Overwrites the current event on the server Thread.Sleep(2000); } catch (nSessionNotConnectedException ) { while (!mySession.isConnected()) { Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second..."); try { Thread.Sleep(1000); } catch (Exception ) { } } } catch (nBaseClientException ex) { Console.WriteLine("Publish.cs : Exception : " + ex.Message); throw ex; } //Check if an asynchronous exception has been received if (!isOk) { //If it has, then throw it throw asyncException; } } } catch (nSessionPausedException ) { Console.WriteLine("Session has been paused, please resume the session"); Environment.Exit(1); } catch (nSecurityException ) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Environment.Exit(1); } catch (nSessionNotConnectedException ) { Console.WriteLine("The session object used is not physically connected to the Nirvana realm."); Console.WriteLine("Please ensure the realm is up and check your RNAME value."); Environment.Exit(1); } catch (nUnexpectedResponseException ) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Environment.Exit(1); } catch (nRequestTimedOutException ) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values."); Environment.Exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } public void onMessage(nConsumeEvent evt) { Console.WriteLine("Group : " + evt.getDataGroupName()); Console.WriteLine("Event id : " + evt.getEventID()); Console.WriteLine("Event tag : " + evt.getEventTag()); Console.WriteLine("Is Delta : " + evt.getAttributes().isDelta()); //Print the message data if (evt.hasAttributes()) { displayEventAttributes(evt.getAttributes()); } nEventProperties prop = evt.getProperties(); if (prop != null) { displayEventProperties(prop); } Console.WriteLine(); } protected override void processArgs(String[] args) { // // Need a min of RNAME if (args.Length < 1) { Usage(); Environment.Exit(1); } String RNAME = args[0]; ; int count = 10; if (args.Length > 1) count = int.Parse(args[1]); // // Run the sample app // mySelf.doit(parseRealmProperties(RNAME), count); } public static void Main(String[] args) { //Create an instance for this class mySelf = new DataGroupDeltaDelivery(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("DataGroupDeltaDelivery [count] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " \n"); Console.WriteLine( " - the number of times to commit the registered events - default : 10"); } } }