Package apama :: Module common
[hide private]
[frames] | no frames]

Source Code for Module apama.common

  1  #!/usr/bin/env python 
  2  # Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights reserved 
  3  # Copyright (c) 2013,2015-2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. 
  4  # Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG 
  5  # 
  6  # This module contains internal implementation utility code for pysys-apama,  
  7  # which is unsupported and may be removed at any time.  
  8  # 
  9  """ 
 10  @undocumented: XArgsHolder, _initProject, _allocateUniqueProcessStdOutErr, stringToUnicode 
 11  """ 
 12   
 13  import time 
 14  from pysys import log 
 15  from pysys.constants import * 
 16  from pysys.exceptions import * 
 17  from pysys.process.helper import ProcessWrapper # only used when we need special handling, startProcess is usually better 
 18   
19 -def _initProject(project):
20 """ 21 Internal-only, do not use. 22 """ 23 try: 24 assert project.APAMA_HOME 25 assert project.APAMA_WORK 26 except Exception: 27 raise Exception("Not running inside an Apama environment. APAMA_HOME and APAMA_WORK not set") 28 29 # Supply default values for these if we can 30 if not getattr(project, "APAMA_BIN_DIR", None): 31 project.APAMA_BIN_DIR = os.path.join(project.APAMA_HOME, 'bin') 32 if not getattr(project, "APAMA_COMMON_JRE", None): 33 project.APAMA_COMMON_JRE = os.getenv('APAMA_COMMON_JRE', project.APAMA_HOME+'/../jvm/jvm/jre') 34 if not getattr(project, "APAMA_LIBRARY_VERSION", None): # no longer needed so doesn't matter 35 project.APAMA_LIBRARY_VERSION = os.getenv('APAMA_LIBRARY_VERSION', 'UNKNOWN_APAMA_LIBRARY_VERSION')
36
37 -class ApamaServerProcess(object):
38 """Abstract parent helper class for Apama server processes. 39 40 @ivar parent: Reference to the PySys testcase instantiating this class instance 41 @type parent: pysys.basetest 42 @ivar port: Port used for starting and interaction with the process 43 @type port: integer 44 @ivar host: Hostname for interaction with a remote process 45 @type host: string 46 @ivar environ: The environment for running the process 47 @type environ: dictionary 48 49 """ 50
51 - def __init__(self, parent, name, port=None, host=None):
52 """Create an instance of the class. 53 54 If no port parameter is used in the argument list an available port will be dynamically found from 55 the OS and used for starting the process, and performing all operations against it. The host 56 parameter is only used to perform operations against a remote process started externally to the 57 PySys framework - the class does not support the starting of a remote process. 58 59 @param parent: Reference to the parent PySys testcase 60 @param port: The port used for starting and interacting with the Correlator 61 @param host: The hostname used for interaction with a remote Correlator 62 63 64 """ 65 self.parent = parent 66 self.process = None # only ever valid if started locally 67 _initProject(self.parent.project) 68 69 def cleanup(): 70 try: 71 if self.process and self.process.running(): # only if started by us 72 self.shutdown(ignoreExitStatus=True) 73 except Exception, e: 74 # for now don't actually cause a failure if this happens; could add that as an option later if it's useful 75 self.parent.log.error('Failed to shutdown %s: %s', self, e)
76 if getattr(self.parent.project, 'shutdownApamaComponentsAfterTest', 'true').lower() == 'true': 77 parent.addCleanupFunction(cleanup) 78 79 self.port = int(port) if port else port 80 self.host = host 81 82 if self.port == None: self.port = parent.getNextAvailableTCPPort() 83 if self.host == None: self.host = "localhost" 84 self.environ = {} 85 self.log = self.parent.log 86 for key in os.environ: self.environ[stringToUnicode(key)] = stringToUnicode(os.environ[key]) 87 88 if not self.parent.project: raise Exception("PROJECT is not set - pysys environment not configured correctly") 89 90 _initProject(self.parent.project) 91 92 93 self.name = name
94
95 - def waitForComponentUp(self, timeout=TIMEOUTS['WaitForSocket'], **xargs):
96 """Block until the the component declares itself to be ready for processing. 97 98 """ 99 100 # set the command and display name 101 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'component_management') 102 103 # set the default stdout and stderr 104 stdout, stderr = self.parent.allocateUniqueStdOutErr('waitForComponentUp_%s'%self.name) 105 106 # set the arguments to the process 107 arguments = [] 108 #arguments.append("-v") 109 arguments.append("-w") 110 arguments.extend(["-p", "%d" % self.port]) 111 arguments.extend(["-n", self.host]) 112 113 waitprocess = ProcessWrapper(command, arguments, self.environ, self.parent.output, BACKGROUND, timeout, stdout, stderr, displayName='waitForComponentUp %s'%self) 114 waitprocess.start() 115 try: 116 117 endtime = time.time()+timeout 118 while time.time() < endtime and waitprocess.running(): 119 # if the component was started locally, then early-out if it has failed 120 if self.process and not self.process.running(): 121 self.parent.addOutcome(BLOCKED, '%s failed with exit code %d in waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError) 122 return # in case it doesn't abort 123 time.sleep(0.05) 124 if waitprocess.running(): 125 self.parent.addOutcome(TIMEDOUT, 'Timed out waiting for %s to come up (after %d secs)'%(self, timeout), abortOnError=self.parent.defaultAbortOnError) 126 elif waitprocess.exitStatus != 0: 127 # shouldn't really happen 128 self.parent.addOutcome(BLOCKED, 'component_management failed while waiting for %s to come up, with exit status %d'% 129 (self, waitprocess.exitStatus), abortOnError=self.parent.defaultAbortOnError) 130 else: 131 # a short wait is needed to detect port in use startup failures where the component is already running on the port, 132 # to give time for the process to try and fail to bind the port. 133 # set it to 0 by default to keep testcases running as fast as possible, but 134 # may be set to a higher number for deployments 135 sltime = float(getattr(self.parent.project, 'APAMA_START_COMPONENT_PORT_IN_USE_SLEEP_SECS', '0')) 136 if sltime > 0: 137 time.sleep(sltime) 138 139 # in case a different component was started on this port from the local one we wanted 140 if self.process and not self.process.running(): 141 self.parent.addOutcome(BLOCKED, '%s failed with exit code %d during waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError) 142 143 self.parent.log.info(" Component %s is now running", self) 144 finally: 145 if waitprocess.running(): waitprocess.stop()
146 147
148 - def shutdown(self, message='Shutdown requested by test', timeout=60, **args):
149 """ Requests a clean shutdown of the component. 150 151 If it was started by this object, also waits for the process to 152 terminate, and silently ignores requests to shutdown if the process 153 was already stopped. 154 """ 155 if self.process and not self.process.running(): return 156 self.manage(arguments=['--shutdown', message], timeout=timeout, displayName='shutdown', **args) 157 if self.process: 158 self.process.wait(timeout=timeout)
159
160 - def running(self):
161 """ Returns True if this has a local process that is running, or 162 False if it has stopped, or was not started by this object. 163 """ 164 return self.process and self.process.running()
165
166 - def __repr__(self): return '%s<%s%s:%d>'%( 167 self.name, 168 ('pid %s on '%self.process.pid) if (self.process and self.process.pid) else '', 169 self.host, self.port 170 )
171
172 - def manage(self, arguments=[], displayName="manage", **xargs):
173 """Execute component_management operations against the process. 174 175 @param arguments: The arguments to be passed to component_management 176 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 177 178 """ 179 # set the command and display name 180 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'component_management') 181 182 assert not isinstance(arguments, basestring) # cannot pass a string in here 183 184 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, displayName) 185 186 # transform xargs into an instance of the xargs holder class 187 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, arguments=arguments, project=self.parent.project) 188 189 # set the arguments to the process 190 arguments = [] 191 arguments.extend(["-p", "%d" % self.port]) 192 if self.host: arguments.extend(["-n", self.host]) 193 if xargs.arguments: arguments.extend(xargs.arguments) 194 195 if displayName == 'manage': 196 displayName='%s <%s> [%s]'%(displayName, self.name, ' '.join(xargs.arguments)) 197 if '-s' in xargs.arguments or '--shutdown' in xargs.arguments: 198 displayName = 'manage <%s> --shutdown'%self.name # keep it concise 199 else: 200 displayName = '%s <%s>'%(displayName, self.name) 201 202 # start the process 203 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
204
205 -def _allocateUniqueProcessStdOutErr(parent, processKey):
206 """ Allocate filenames of the form processKey.out[.n] (similarly for .err) 207 for a process that is about to be started, such that the names are not 208 repeated within the specified parent's lifetime. 209 210 Not for customer use, and legacy only - the pysys allocateUniqueStdOutErr method is recommended instead. 211 212 Returns a tuple of (stdout, stderr). 213 """ 214 # NB: use this only for existing methods (backwards compat) - for anything new the 215 # pysys 1.1 parent.allocateUniqueStdOutErr method is better, but as it uses a different format 216 # can't use it out fo the box 217 if not hasattr(parent, '_apama_uniqueProcessKeys'): parent._apama_uniqueProcessKeys = {} 218 newval = parent._apama_uniqueProcessKeys.get(processKey, -1)+1 219 parent._apama_uniqueProcessKeys[processKey] = newval 220 221 suffix = '.%d'%newval if newval > 0 else '' 222 223 return ( 224 os.path.join(parent.output, processKey+'.out'+suffix), 225 os.path.join(parent.output, processKey+'.err'+suffix), 226 )
227
228 -def stringToUnicode(s):
229 """ 230 @deprecated: Internal helper function: do not use, will be removed in a future release. 231 232 Converts a unicode string or a utf-8 bit string into a unicode string. 233 234 """ 235 if isinstance(s, unicode): 236 return s 237 else: 238 return unicode(s, "utf8")
239 240
241 -class XArgsHolder:
242 """ 243 244 @deprecated: Internal helper class: do not use, will be removed in a future release. 245 246 Class to store all supported xargs method arguments used in the helper classes. 247 248 Methods in the Apama helper classes define process related methods with a signature including 249 named parameters for the most commonly used options to the process, and an **xargs parameter to allow 250 passing through of additional supported named parameters, e.g. workingDir, state, timeout, stdout, stderr and 251 arguments. The XArgsHolder class takes the **xargs parameter from a method call (which is treated by 252 python as a dictionary of name value pairs) and default values for the workingDir, state, timeout, stdout, stderr 253 and arguments; these are used to set data attributes to the class instance with the default values. The class then 254 iterates over the **xargs and over-writes the default values if they exist in the parameter. This allows a 255 user of the class to create an instance to hold the additional arguments with default values in the first 256 case, but for these to be replaced if an alternative value is supplied via **xargs, e.g. the user of the method wants 257 to explicitly set the sdtout etc. 258 259 The kwargs member contains a dictionary that can be passed through to startProcess using **kwargs, 260 containing every setting except for arguments which should always be handled directly. 261 262 Typical usage would be startProcess(arguments=xargs.arguments, **xargs.kwargs) 263 264 """ 265
266 - def __init__(self, xargs, workingDir=None, state=FOREGROUND, timeout=TIMEOUTS['WaitForProcess'], stdout=None, stderr=None, arguments=[], ignoreExitStatus=None, project=None):
267 """Create an instance of the XArgsHolder class. 268 269 @param xargs: The variable argument list passed into the method, which override the defaults provided by the other arguments 270 @param workingDir: The default value for the working directory of a process 271 @param state: The default state of the process (L{pysys.constants.BACKGROUND} | L{pysys.constants.FOREGROUND}) 272 @param timeout: The default value of the process timeout 273 @param stdout: The default value of the process stdout 274 @param stderr: The default value of the process stderr 275 @param arguments: Extra command line arguments to be passed to the process. 276 @param ignoreExitStatus: Set to True to change default behaviour for this process to prevent 277 non-zero exit code being treated as a test failure. The project setting 278 defaultApamaIgnoreExitStatus can be used to control this globally. 279 280 """ 281 # this allows arbitrary extra args to be passed to startProcess e.g. abortOnError, anything we add in future 282 self.kwargs = dict(xargs) 283 284 # set kwargs and members for known keywords. Give priority to xargs provided if specified. 285 self.workingDir = self.kwargs['workingDir'] = xargs.get('workingDir', workingDir) 286 self.state = self.kwargs['state'] = xargs.get('state', state) 287 self.timeout = self.kwargs['timeout'] = xargs.get('timeout', timeout) 288 self.stdout = self.kwargs['stdout'] = xargs.get('stdout', stdout) 289 self.stderr = self.kwargs['stderr'] = xargs.get('stderr', stderr) 290 291 if ignoreExitStatus==None: 292 ignoreExitStatus = project.defaultApamaIgnoreExitStatus.lower()=='true' if project and hasattr(project, 'defaultApamaIgnoreExitStatus') else False 293 self.ignoreExitStatus = self.kwargs['ignoreExitStatus'] = xargs.get('ignoreExitStatus', ignoreExitStatus) 294 295 # arguments should be handled differently, need to remove from kwargs since it's common for callers to 296 # want to add to it and manipulate it directly 297 self.arguments = self.kwargs.pop('arguments', arguments) 298 299 # sanity checking 300 if self.arguments == None: self.arguments = [] # ensure it is always a list 301 if isinstance(self.arguments, basestring): raise Exception('arguments must be a list not a string')
302