/* * * Copyright (c) 1999 - 2011 my-Channels Ltd * Copyright (c) 2012 - 2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. * * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG. * */ using System; using System.Collections.Generic; using System.Threading; using com.pcbsys.nirvana.client; namespace com.pcbsys.nirvana.apps { class txpublish : nSampleApp { private static txpublish mySelf = null; private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding(); /** * This method demonstrates the Nirvana API calls necessary to publish to * a channel transactionally. * It is called after all command line arguments have been received and * validated * * @param realmDetails a String[] containing the possible RNAME values * @param achannelName the channel name to publish to * @param count the number of messages to publish * @param size the size (bytes) of each message to be published * @param txSize the number of messages to be published in each transaction */ private void doit(String[] realmDetails, String achannelName, int count, int size, int txSize) { mySelf.constructSession(realmDetails); //Publishes to the specified channel try { //Create a channel attributes object nChannelAttributes nca = new nChannelAttributes(); nca.setName(achannelName); //Obtain a reference to the channel nChannel myChannel = mySession.findChannel(nca); //Create a byte array filled with characters equal to // the message size specified. This could be a result //of String.getBytes() call in a real world scenario. byte[] buffer = new byte[size]; for (int x = 0; x < size; x++) { buffer[x] = (byte)((x % 90) + 32); } //Create a list that contains the messages to be published on each //transactional publish call List vevents = new List(); for (int x = 0; x < txSize; x++) { //nEventProperties props = new nEventProperties(); //You can add other types in a dictionary object //props.put("key0string", "1"); //props.put("key1int", (int)1); //props.put("key2long", (long)-11); //props.put("key3bool", true); //props.put("key4short", (short)1); //props.put("key5float", (float)1.123456); //props.put("key6double", (double)1.123456); //props.put("key7char", '1'); nConsumeEvent evt = new nConsumeEvent("TXID-" + x,buffer); // Note the Tag does NOT need the TX vevents.Add(evt); } //Inform the user that publishing is about to start Console.WriteLine("Starting publish of " + count + " events of size " + size); //Obtain the channel's last event ID prior to us publishing anything long startEid = myChannel.getLastEID(); //Get a timestamp to be used to calculate the message publishing rates DateTime start = DateTime.Now; //Create a transaction attributes object based on the channel we wish to publish to nTransactionAttributes TXAttrib = new nTransactionAttributes(myChannel, 1000); //Define a transaction object nTransaction tx = null; //Loop as many times as the number of messages we want to publish //divided by the number of events per transaction for (int x = 0; x < ((int)(count / txSize)); x++) { try { //Create a new transaction object tx = nTransactionFactory.create(TXAttrib); //Publish the list of messages create previously tx.publish(vevents); //Commit the transaction tx.commit(); } catch (nSessionNotConnectedException ) { while (!mySession.isConnected()) { Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second..."); try { Thread.Sleep(1000); } catch (Exception ) { } } if (tx != null) { try { tx.commit(); Console.WriteLine("Commited tx after failover..."); } catch (Exception ) { Console.WriteLine("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event.."); x--; } } else { Console.WriteLine("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event.."); x--; } } } //Calculate the actual number of events published by obtaining //the channel's last eid after our publishing and subtracting //the channel's last eid before our publishing. //This also ensures that all client queues have been flushed. long events = myChannel.getLastEID() - startEid; //Get a timestamp to calculate the publishing rates DateTime end = DateTime.Now; TimeSpan dif = end - start; //Calculate the events / sec rate long eventPerSec = ((events * 1000) / ((dif.Milliseconds))); //Calculate the bytes / sec rate long bytesPerSec = eventPerSec * size; //Inform the user of the resulting rates Console.WriteLine("Events = " + events + " Events/sec = " + eventPerSec + " Bytes/Sec = " + bytesPerSec); } //Handle errors catch (nChannelNotFoundException ) { Console.WriteLine("The channel specified could not be found."); Console.WriteLine("Please ensure that the channel exists in the REALM you connect to."); Environment.Exit(1); } catch (nSecurityException ) { Console.WriteLine("Unsufficient permissions for the requested operation."); Console.WriteLine("Please check the ACL settings on the server."); Environment.Exit(1); } catch (nSessionNotConnectedException ) { Console.WriteLine("The session object used is not physically connected to the Nirvana realm."); Console.WriteLine("Please ensure the realm is up and check your RNAME value."); Environment.Exit(1); } catch (nUnexpectedResponseException ) { Console.WriteLine("The Nirvana REALM has returned an unexpected response."); Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible."); Environment.Exit(1); } catch (nUnknownRemoteRealmException ) { Console.WriteLine("The channel specified resided in a remote realm which could not be found."); Console.WriteLine("Please ensure the channel name specified is correct."); Environment.Exit(1); } catch (nRequestTimedOutException ) { Console.WriteLine("The requested operation has timed out waiting for a response from the REALM."); Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values."); Environment.Exit(1); } catch (nTransactionAlreadyCommittedException ) { Console.WriteLine("The transaction you attempted to commit has already been commited on this REALM."); Environment.Exit(1); } catch (nTransactionNotStartedException ) { Console.WriteLine("The transaction you attempted to commit has not yet started on this REALM."); Environment.Exit(1); } catch (nBaseClientException ) { Console.WriteLine("An error occured while creating the Channel Attributes object."); Environment.Exit(1); } //Close the session we opened try { nSessionFactory.close(mySession); } catch (Exception ) { } //Close any other sessions so that we can exit nSessionFactory.shutdown(); } protected override void processArgs(String[] args) { // // Need a min of 2, rname, channel name if (args.Length < 2) { Usage(); Environment.Exit(2); } String RNAME = args[0]; ; var channelName = args[1]; int count = 10; if (args.Length > 2) { count = Convert.ToInt32(args[2]); } int size = 100; if (args.Length > 3) { size = Convert.ToInt32(args[3]); } int txSize = 1; if (args.Length > 4) { txSize = Convert.ToInt32(args[4]); } // // Run the sample app // mySelf.doit(parseRealmProperties(RNAME), channelName, count, size, txSize); } public static void Main(String[] args) { //Create an instance for this class mySelf = new txpublish(); //Process command line arguments mySelf.processArgs(args); } /** * Prints the usage message for this class */ private static void Usage() { Console.WriteLine("Usage ...\n"); Console.WriteLine("txpublish [count] [size] [tx size] \n"); Console.WriteLine( " \n"); Console.WriteLine( " - the rname of the server to connect to"); Console.WriteLine( " - Channel name parameter for the channel to publish to"); Console.WriteLine( "\n[Optional Arguments] \n"); Console.WriteLine( "[count] -The number of events to publish (default: 10)"); Console.WriteLine( "[size] - The size (bytes) of the event to publish (default: 100)"); Console.WriteLine( "[tx size] - The number of events per transaction (default: 1)"); } } }