Package apama :: Module iaf
[hide private]
[frames] | no frames]

Source Code for Module apama.iaf

  1  #!/usr/bin/env python 
  2  # Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights 
  3  # Copyright (c) 2013 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its Subsidiaries and or/its Affiliates and/or their licensors. 
  4  # Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG 
  5   
  6  import sys, os, string, logging, socket, copy 
  7   
  8  from pysys import log 
  9  from pysys.constants import * 
 10  from pysys.exceptions import * 
 11  from pysys.utils.filereplace import replace as rep 
 12  from pysys.process.helper import ProcessWrapper 
 13   
 14  from apama.common import XArgsHolder 
 15  from apama.common import stringToUnicode 
 16  from xml.dom.minidom import getDOMImplementation 
 17   
 18   
19 -class IAFHelper:
20 """Helper class for the Software AG Apama Integration Adapter Framework (IAF). 21 22 The IAF Helper class has been designed for use as an extension module to the PySys System Test 23 Framework, offering the ability to configure, start and interact with an instance of the IAF. The usage 24 pattern of the class is to create an instance per IAF, and to then make method calls onto the instance 25 to perform operations such as to start the component, request reload of the configuration file, perform 26 management operationes etc. For example:: 27 28 iaf = IAFHelper(self, config="iaf-config.xml") 29 iaf.start(logfile="iaf.log") 30 iaf.client(reload=True) 31 32 Process related methods of the class declare a method signature which includes named parameters for the 33 most frequently used options to the method. They also declare the **xargs parameter to allow passing in of 34 additional supported arguments to the process. The additional arguments that are currently supported via 35 **xargs are:: 36 37 workingDir: The default value for the working directory of a process 38 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND) 39 timeout: The default value of the process timeout 40 stdout: The default value of the process stdout 41 stderr: The default value of the process stderr 42 arguments: List of extra arguments to be passed to the process 43 44 This means that legitimate calls to the start method include:: 45 46 iaf.start(logfile="iaf.log") 47 iaf.start(logfile="iaf.log", stdout="correlator1.out") 48 iaf.start(state=FOREGROUND, timeout=5) 49 50 51 @ivar parent: Reference to the PySys testcase instantiating this class instance 52 @type parent: pysys.basetest 53 @ivar port: Port used for starting and interaction with the Event Correlator 54 @type port: integer 55 @ivar host: Hostname for interaction with a remote Event Correlator 56 @type host: string 57 @ivar environ: The environment for running the IAF 58 @type environ: dictionary 59 60 """ 61
62 - def __init__(self, parent, port=None, host=None):
63 """Create an instance of the IAFHelper class. 64 65 If no port parameter is used in the argument list an available port will be dynamically found 66 from the OS and used for starting the IAF, and performing all operations against it. The host 67 parameter is only used to perform operations against a remote IAF started external to the PySys 68 framework - the class does not support the starting of an IAF remote to the localhost. 69 70 @param parent: Reference to the parent PySys testcase 71 @param port: The port used for starting and interacting with the IAF 72 @param host: The hostname used for interaction with a remote IAF 73 74 """ 75 self.parent = parent 76 self.port = port 77 self.host = host 78 if self.port == None: self.port=parent.getNextAvailableTCPPort() 79 if self.host == None: self.host="localhost" 80 self.environ = {} 81 for key in os.environ: self.environ[stringToUnicode(key)] = stringToUnicode(os.environ[key]) 82 83 # make a check on the project variable this helper requires 84 for variable in ["APAMA_COMMON_JRE"]: 85 try: 86 if getattr(PROJECT, variable) == "": 87 raise Exception("Project variable %s not set within the environment" % variable) 88 except AttributeError: 89 raise Exception("Project variable %s undefined in .pysysproject" % variable)
90 91
92 - def addToClassPath(self, path):
93 """Add the supplied path to the APAMA_IAF_CLASSPATH environment variable for starting this IAF instance. 94 95 """ 96 if self.environ.has_key("APAMA_IAF_CLASSPATH"): 97 self.environ["APAMA_IAF_CLASSPATH"] = r"%s%s%s" % (self.environ["APAMA_IAF_CLASSPATH"], ENVSEPERATOR, os.path.normpath(path)) 98 else: 99 self.environ["APAMA_IAF_CLASSPATH"] = os.path.normpath(path)
100 101
102 - def addToPath(self, path):
103 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this IAF instance. 104 105 """ 106 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" 107 else: key = "PATH" 108 109 if self.environ.has_key(key): 110 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path)) 111 else: 112 self.environ[key] = os.path.normpath(path)
113 114
115 - def start(self, configname, configdir=None, replace={}, logfile=None, verbosity=None, **xargs):
116 """Start the IAF. 117 118 Start an IAF using the supplied configuration file. If the C{configdir} argument is not 119 supplied, the location of the configuration file defaults to the testcase input 120 directory. The configuration file can first be tailored to replaced token values within 121 the file with values required at run time using the C{replace} argument. For replace of the 122 form :: 123 124 replace = {"@logs_dir@":"/var/tmp/logs"} 125 126 any tokens in the coniguration file directly matching @logs_dir@ will be replaced with the 127 value "/var/tmp/logs". Note that multiple token value pairs can be supplied in the replace 128 dictionary. 129 130 @param configname: The IAF configuration file or template name 131 @param configdir: The directory containing the IAF configuration file or template 132 @param logfile: Name of the IAF log file 133 @param verbosity: The verbosity level of the IAF logging 134 @param replace: A dictionary of tokens / values to be replaced in the configuration file 135 @param xargs: Variable argument list for the additional supported process parameters 136 137 """ 138 # set the command and display name 139 command = os.path.join(PROJECT.APAMA_HOME, 'bin', 'iaf') 140 displayName = "iaf" 141 142 # set the default stdout and stderr 143 instances = self.parent.getInstanceCount(displayName) 144 dstdout = os.path.join(self.parent.output, 'iaf.out') 145 dstderr = os.path.join(self.parent.output, 'iaf.err') 146 if instances: dstdout = "%s.%d" % (dstdout, instances) 147 if instances: dstderr = "%s.%d" % (dstderr, instances) 148 if not configdir: configdir = self.parent.input 149 150 # transform xargs into an instance of the xargs holder class 151 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr) 152 153 # tailor the configuration file 154 output = os.path.join(self.parent.output, configname) 155 rep(os.path.join(configdir, configname), output, replace, marker='') 156 157 # set the arguments to the process 158 arguments = [] 159 arguments.extend(["-p", "%d" % self.port]) 160 if logfile: arguments.extend(["-f", logfile]) 161 if verbosity: arguments.extend(["-l", verbosity]) 162 if xargs.arguments: arguments.extend(xargs.arguments) 163 arguments.append(os.path.join(self.parent.output, configname)) 164 165 # start the process - unicoding environment needs moving into the PySys framework 166 env = {} 167 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) 168 hprocess = self.parent.startProcess(command, arguments, env, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName) 169 self.waitForIAFUp() 170 return hprocess
171 172
173 - def client(self, reload=False, suspend=False, resume=False, **xargs):
174 """Perform client operations against the IAF (reload, suspend, resume). 175 176 @param reload: Request reload of the IAF configuration file 177 @param suspend: Request the IAF to suspend event sending 178 @param resume: Request the IAF to resume event sending 179 @param xargs: Variable argument list for the additional supported process parameters 180 181 """ 182 # set the command and display name 183 command = os.path.join(PROJECT.APAMA_HOME, 'bin', 'iaf_client') 184 displayName = "iaf_client" 185 186 # set the default stdout and stderr 187 instances = self.parent.getInstanceCount(displayName) 188 dstdout = os.path.join(self.parent.output, 'iafclient.out') 189 dstderr = os.path.join(self.parent.output, 'iafclient.err') 190 if instances: dstdout = "%s.%d" % (dstdout, instances) 191 if instances: dstderr = "%s.%d" % (dstderr, instances) 192 193 # transform xargs into an instance of the xargs holder class 194 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 195 196 # set the arguments to the process 197 arguments = [] 198 arguments.extend(["-p", "%d" % self.port]) 199 if self.host: arguments.extend(["-n", self.host]) 200 if reload: arguments.append("--reload") 201 elif suspend: arguments.append("--suspend") 202 elif resume: arguments.append("--resume") 203 if xargs.arguments: arguments.extend(xargs.arguments) 204 205 # start the process 206 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
207 208
209 - def waitForIAFUp(self, timeout=20):
210 """Block until the IAF declares itself to be ready for processing. 211 212 """ 213 # set the command and display name 214 command = os.path.join(PROJECT.APAMA_HOME, 'bin', 'iaf_management') 215 216 # set the default stdout and stderr 217 stdout = os.path.join(self.parent.output, 'iaf_management.out') 218 stderr = os.path.join(self.parent.output, 'iaf_management.err') 219 220 # set the arguments to the process 221 arguments = [] 222 arguments.append("-v") 223 arguments.append("-w") 224 arguments.extend(["-p", "%d" % self.port]) 225 arguments.extend(["-n", self.host]) 226 227 self.parent.log.info("Waiting for IAF to come up ...") 228 try: 229 process = ProcessWrapper(command, arguments, self.environ, self.parent.output, FOREGROUND, timeout, stdout, stderr) 230 process.start() 231 except ProcessError, e: 232 raise Exception("Unable to start iaf_management to check for IAF up: %s"%e) 233 except ProcessTimeout: 234 process.stop() 235 raise Exception("Timed out after %d seconds waiting for the IAF to come up"%timeout)
236