Package apama :: Module correlator
[hide private]
[frames] | no frames]

Source Code for Module apama.correlator

   1  #!/usr/bin/env python 
   2  # Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights 
   3  # Copyright (c) 2013-2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. 
   4  # Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG 
   5   
   6  import sys, os, string, logging, socket, copy, random, types 
   7   
   8  from pysys import log, ThreadFilter 
   9  from pysys.constants import * 
  10  from pysys.exceptions import * 
  11   
  12  from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr 
  13  from apama.common import XArgsHolder 
  14  from apama.common import stringToUnicode 
  15   
  16  from xml.dom.minidom import getDOMImplementation 
  17   
  18   
19 -class CorrelatorHelper(ApamaServerProcess):
20 """Helper class for the Software AG Apama Event Correlator. 21 22 The Correlator Helper class has been designed for use as an extension module to the PySys System Test 23 Framework, offering the ability to configure, start and interact with an Event Correlator. The usage 24 pattern of the class is to create an instance per Correlator, and to then make method calls onto the 25 instance to perform operations such as the injection of EPL or java JMON applications, the 26 sending of events, deletion of named objects etc. For example:: 27 28 correlator = CorrelatorHelper(self, name='mycorrelator') 29 correlator.start(logfile="mycorrelator.log") 30 correlator.inject(filenames=["simple.mon"]) 31 32 Process related methods of the class declare a method signature which includes named parameters for the 33 most frequently used options to the method. They also declare the **xargs parameter to allow passing in 34 of additional supported arguments to the process. The additional arguments that are currently supported 35 via **xargs are:: 36 37 workingDir: The default value for the working directory of a process 38 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND) 39 timeout: The default value of the process timeout 40 stdout: The default value of the process stdout 41 stderr: The default value of the process stderr 42 arguments: List of extra arguments to be passed to the process 43 44 This means that legitimate calls to the start method include:: 45 46 correlator.start(logfile="correlator.log") 47 correlator.start(logfile="correlator.log", stdout="correlator1.out") 48 correlator.start(state=FOREGROUND, timeout=5) 49 50 51 @ivar parent: Reference to the PySys testcase instantiating this class instance 52 @type parent: pysys.basetest 53 @ivar port: Port used for starting and interaction with the Correlator 54 @type port: integer 55 @ivar host: Hostname for interaction with a remote Correlator 56 @type host: string 57 @ivar environ: The environment for running the Correlator 58 @type environ: dictionary 59 60 """ 61 62 """ Holds the Java classpath used when starting a correlator with JVM.""" 63 classpath = None 64 65 licence = None 66 67
68 - def __init__(self, parent, port=None, host=None, name='correlator'):
69 """Create an instance of the CorrelatorHelper class. 70 71 If no port parameter is used in the argument list an available port will be dynamically found from 72 the OS and used for starting the Correlator, and performing all operations against it. The host 73 parameter is only used to perform operations against a remote Correlator started external to the 74 PySys framework - the class does not support the starting of a Correlator remote to the localhost. 75 76 @param parent: Reference to the parent PySys testcase 77 @param port: The port used for starting and interacting with the Correlator 78 @param host: The hostname used for interaction with a remote Correlator 79 @param name: A display name for this process (default is "correlator"), also used for the default 80 stdout/err filenames. 81 82 """ 83 ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host) 84 self.injectJMON = self.injectJava # old alias for this method 85 """ 86 Alias for injectJava 87 @see: L{injectJava} 88 """ 89 self.injectMon = self.injectEPL # shorter alias for injecting a .mon file 90 """ 91 Alias for injectEPL 92 @see: L{injectEPL} 93 """ 94 self.injectMonitorscript = self.injectEPL # Backwards compatibility 95 """ 96 Alias for injectEPL 97 @see: L{injectEPL} 98 """ 99 100 if self.environ.has_key("CLASSPATH"): # will remove this when we can, it's not robust for this env var to affect behaviour 101 self.parent.log.warn('The CLASSPATH environment variable is set and will be passed to the correlator process - this behaviour is deprecated and will be removed in a future release so please ensure your test does not rely on this and unset CLASSPATH before invoking pysys if possible (CLASSPATH=%s)', self.environ["CLASSPATH"]) 102 self.classpath = self.environ["CLASSPATH"] 103 """ 104 Initialise the classpath to the environment CLASSPATH, if it exists. 105 This will be removed once the correlator is no longer allowed to use CLASSPATH. 106 """ 107 108 self.licence = os.path.join(self.parent.project.APAMA_WORK, 'license', 'ApamaServerLicense.xml') 109 if not os.path.exists(self.licence): 110 # Don't put licence on the command line if it doesn't exist - run 111 # in evaluation mode, or find it automatically 112 self.licence = None 113 """ Set the licence file location. """
114 115 116
117 - def addToClassPath(self, path):
118 """Add the supplied path to the Java classpath variable for starting this instance. 119 """ 120 if self.classpath == None: 121 self.classpath = os.path.normpath(path) 122 else: 123 self.classpath = r"%s%s%s" % (self.classpath, ENVSEPERATOR, os.path.normpath(path))
124
125 - def addToPath(self, path):
126 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance. 127 128 """ 129 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" 130 else: key = "PATH" 131 132 if self.environ.has_key(key): 133 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path)) 134 else: 135 self.environ[key] = os.path.normpath(path)
136 137
138 - def start(self, logfile=None, verbosity=None, java=False, Xclock=None, environ=None, inputLog=None, waitForServerUp=True, config = None, **xargs):
139 """Start the Correlator. 140 141 @param logfile: Name of the Correlator log file (if used, set this to something similar to the display "name" 142 passed to the constructor) 143 @param verbosity: The verbosity level of the Correlator logging 144 @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications 145 @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode 146 @param environ: Map of environment variables to override. 147 @param inputLog: Relative or absolute path of file to write input log to, containing all events, EPL and other 148 inputs sent to the correlator. The format of the input log may change without notice so should not be 149 replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the 150 input log can also be used to pass information to customer support in the event of a problem. 151 @param waitForServerUp: Set to False to disable automatically waiting until the component is ready 152 @param config: path or list of paths to a initialization or connectivity configuration file or directory 153 containing them 154 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 155 156 """ 157 # set the command and display name 158 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'correlator') 159 160 dstdout, dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name) 161 162 # transform xargs into an instance of the xargs holder class 163 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project) 164 165 # set the arguments to the process 166 arguments = [] 167 arguments.extend(["--name", self.name, "-p", "%d" % self.port]) 168 if self.licence is not None: arguments.extend(["-l", "%s" % self.licence]) 169 if logfile: arguments.extend(["-f", logfile]) 170 if verbosity: arguments.extend(["-v", verbosity]) 171 if java: arguments.append("--java") 172 if inputLog: arguments.extend(["--inputLog", os.path.join(self.parent.output, inputLog)]) 173 if Xclock: arguments.append("-Xclock") 174 if config != None: 175 if isinstance(config, basestring): 176 arguments.extend(['--config', config]) 177 elif type(config) is types.ListType: 178 for param in config: 179 arguments.extend(['--config', param]) 180 else: 181 raise Exception("Input parameter for config is not a string or list type") 182 if java and self.classpath: 183 arguments.extend(["--javaopt", "-Djava.class.path=%s" % self.classpath]) 184 arguments.extend(xargs.arguments) 185 186 # start the process - unicoding environment needs moving into the PySys framework 187 env = {} 188 if getattr(self.parent, 'eplcoverage','').lower()=='true': 189 env['AP_EPL_COVERAGE_FILE'] = 'correlator.${PID}.eplcoverage' 190 191 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) 192 if environ: 193 for key in environ: env[stringToUnicode(key)] = stringToUnicode(environ[key]) 194 195 if self.process and self.process.running(): 196 raise Exception('Cannot start component as an instance is already running: %s'%self) 197 198 self.process = None 199 try: 200 hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs) 201 self.process = hprocess 202 if waitForServerUp: 203 self.waitForComponentUp(timeout=xargs.timeout) 204 except Exception: 205 for f in [logfile, xargs.stdout, xargs.stderr]: 206 if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*', 'Invalid Correlator argument.*'], tail=True): break 207 raise 208 209 return hprocess
210 211
212 - def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False, 213 logChannels=False, **xargs):
214 """Attach a receiver to the Correlator. 215 216 Returns the process for the receiver. 217 218 @param filename: The basename of the file to write events received from the Correlator to 219 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 220 @param channels: List of channel names to subscribe to 221 @param logChannels: Print the channel each event came from in the output 222 @param suppressBatch: Do not include BATCH timestamps in the output 223 @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived 224 @param utf8: Write output in UTF8 225 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 226 227 """ 228 # set the command and display name 229 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_receive') 230 displayName = 'engine_receive <%s> [%s]'%(self.name, os.path.basename(filename)) 231 232 # set the default stdout and stderr 233 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'receive') 234 235 # transform xargs into an instance of the xargs holder class 236 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) 237 238 # set location of output file (defaults to output subdirectory) 239 if not filedir: filedir = self.parent.output 240 file = os.path.join(filedir, filename) 241 242 # set the arguments to the process 243 arguments = [] 244 arguments.extend(["-p", "%d" % self.port]) 245 if self.host: arguments.extend(["-n", self.host]) 246 if filename: arguments.extend(["-f", file]) 247 if suppressBatch: arguments.append("--suppressbatch") 248 if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch") 249 if utf8: arguments.append("--utf8") 250 if logChannels: 251 arguments.append("--logChannels") 252 arguments.extend(xargs.arguments) 253 for channel in channels: 254 arguments.extend(['--channel', channel]) 255 256 # start the process 257 proc = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs) 258 self.parent.waitForFile(filename, filedir = filedir) 259 return proc
260 261
262 - def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
263 """Obtain runtime operational statistics from the Correlator. 264 265 By default this runs as a BACKGROUND process. The process is returned by the method call. 266 267 @param filename: The basename of the file to write the runtime operational status to 268 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 269 @param raw: Obtain csv format data when logging to file 270 @param interval: The polling interval (seconds) between logging to file 271 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 272 273 @note: When outputing data in the raw (csv) format, the column identifiers and their positions 274 are defined by: 275 - Uptime = 0 276 - Number of monitors = 1 277 - Number of mthreads = 2 278 - Number of java applications = 3 279 - Number of listeners = 4 280 - Number of sub-listeners = 5 281 - Number of event types = 6 282 - Number of events on input queue = 7 283 - Number of events received = 8 284 - Number of events on the route queue = 9 285 - Number of events routed = 10 286 - Number of attached consumers = 11 287 - Number of events on output queue = 12 288 - Number of output events created = 13 289 - Number of output events sent = 14 290 - Number of events processed = 15 291 292 """ 293 # set the command and display name 294 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_watch') 295 296 # set the default stdout and stderr 297 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'watch') 298 299 displayName = "engine_watch <%s> -> %s"%(self.name, os.path.basename(filename or dstdout)) 300 301 # transform xargs into an instance of the xargs holder class 302 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) 303 304 # set location of output file (defaults to output subdirectory) 305 if not filedir: filedir = self.parent.output 306 307 # set the arguments to the process 308 arguments = [] 309 arguments.extend(["-p", "%d" % self.port]) 310 if self.host: arguments.extend(["-n", self.host]) 311 if filename: arguments.extend(["-f", os.path.join(filedir, filename)]) 312 if raw: arguments.append("--raw") 313 if interval: arguments.extend(["-i", "%d" % interval]) 314 arguments.extend(xargs.arguments) 315 316 # start the process 317 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
318
319 - def inspect(self, filename='inspect.txt', filedir=None, raw=False, **xargs):
320 """Obtain information about what application(s) have been injected 321 into the Correlator and what listeners are in existence. 322 323 This runs as a FOREGROUND process. 324 325 @param filename: The basename of the file to write the information to, e.g. inspect.txt 326 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 327 @param raw: Use parser-friendly output format 328 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 329 330 """ 331 assert filename 332 333 # set the command and display name 334 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_inspect') 335 displayName = "engine_inspect <%s> -> %s"%(self.name, os.path.basename(filename)) 336 337 # set the default stdout and stderr 338 dstdout,dstderr = os.path.join(self.parent.output, filename), os.path.join(self.parent.output, filename.replace('.txt','')+'.err') 339 340 # transform xargs into an instance of the xargs holder class 341 xargs=XArgsHolder(xargs, state=FOREGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) 342 343 # set location of output file (defaults to output subdirectory) 344 if not filedir: filedir = self.parent.output 345 file = os.path.join(filedir, filename) 346 347 # set the arguments to the process 348 arguments = [] 349 arguments.extend(["-p", "%d" % self.port]) 350 if self.host: arguments.extend(["-n", self.host]) 351 if raw: arguments.append("--raw") 352 arguments.extend(xargs.arguments) 353 354 # start the process 355 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
356 357
358 - def initialize(self, path, correlatorName=None, properties=None, include=None, exclude=None, **xargs):
359 """Initialize the Correlator by injecting all the files making up the project, 360 typically based on a Designer launch configuration .deploy file. 361 362 This is usually the simplest way to inject all the files from an application 363 into the correlator. Alternative approaches are to call the L{injectEPL} and related 364 methods individually for each file, or to specify the files in the "initialization" 365 section of a yaml file passed into the correlator L{start} call using the "config" argument. 366 367 Queries and Digital Event .mon files will be generated automatically as part of injection, 368 but any Java jar files must be compiled manually before invoking this method. 369 370 @param path: Path of a .deploy file from Designer (recommended), a directory, 371 or a text file listing the files to be injected. 372 Must be an absolute path, or relative to the testcase output dir. 373 @param correlatorName: The name of the Correlator as specified in the launch configuration .deploy 374 file, e.g "defaultCorrelator". If not specified, the name of this pysys correlator will be used. 375 @param properties: Optional path to a .properties file specifying ${var} placeholders 376 to be used for resolving the paths of any files outside the project directory. 377 Absolute path or relative to output dir. 378 @param include: a comma-separated string specifying which of the project files found by the tool 379 should be injected, e.g. "**/foo/Bar*.evt,**.mon". If not specified, all files will be included 380 (unless specifically excluded) 381 @param exclude: a comma-separated string specifying which of the project files found by the tool 382 should NOT be injected, e.g. "**/foo/Bar*.evt". 383 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 384 385 """ 386 # set the command and display name 387 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") 388 389 # set the default stdout and stderr 390 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'initialize-%s'%self.name) 391 392 # transform xargs into an instance of the xargs holder class 393 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 394 395 # set the input and output names for qry and mon 396 path = os.path.join(self.parent.output, path) 397 398 # set arguments to the process 399 arguments = [] 400 arguments.append("-Djava.awt.headless=true") 401 arguments.append("-DAPAMA_LOG_IMPL=log4j") 402 arguments.append("-Dlog4j.configuration=file:///%s/etc/engine-deploy-java-log4j.properties"%self.parent.project.APAMA_HOME.replace('\\','/')) 403 arguments.append("-Djava.io.tmpdir=%s"%self.parent.output) 404 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', 'ap-generate-project-init-list.jar')]) 405 arguments.append(path) 406 if properties: 407 assert isinstance(properties, basestring) 408 arguments.append(os.path.join(self.parent.output, properties)) 409 if correlatorName or ('!' not in path): 410 arguments.extend(['--correlatorName', correlatorName or self.name]) 411 arguments.extend(['--inject', self.host, str(self.port)]) 412 if include: 413 assert isinstance(include, basestring) 414 arguments.extend(['--include', include]) 415 if exclude: 416 assert isinstance(exclude, basestring) 417 arguments.extend(['--exclude', exclude]) 418 arguments.extend(xargs.arguments) 419 420 # start the process to generate the EPL 421 try: 422 status = self.parent.startProcess(command, arguments, self.environ, 423 displayName='initialize <%s> [%s]'%(self.name, os.path.basename(path)), **xargs.kwargs) 424 finally: 425 self.parent.logFileContents(xargs.stderr, maxLines=50) 426 self.parent.logFileContents(xargs.stdout, maxLines=50) 427 # in case there are queries involved 428 return status
429 430
431 - def injectEPL(self, filenames=[], filedir=None, utf8=False, **xargs):
432 """Inject EPL *.mon files into the Correlator. 433 434 See also L{initialize}. 435 436 @param filenames: List of the basename of EPL files to inject into the Correlator 437 @param filedir: Directory containing the input EPL files (defaults to testcase input directory) 438 @param utf8: Assume input is in UTF8 439 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 440 441 """ 442 return self.__inject(filenames, filedir, utf8, False, False, **xargs)
443
444 - def injectJava(self, filename, filedir=None, **xargs):
445 """Inject a Java plug-in or application into the Correlator. 446 447 See also L{initialize}. 448 449 @param filename: The basename of the jar file to inject into the Correlator 450 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 451 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 452 453 """ 454 return self.__inject([filename], filedir, False, True, False, **xargs)
455 456
457 - def injectCDP(self, filenames=[], filedir=None, **xargs):
458 """Inject Correlator deployment package into the Correlator. 459 460 See also L{initialize}. 461 462 @param filenames: List of the basename of cdp files to inject into the Correlator 463 @param filedir: Directory containing the input cdp files (defaults to testcase input directory) 464 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 465 466 """ 467 return self.__inject(filenames, filedir, False, False, True, **xargs)
468 469
470 - def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs):
471 """Inject a Scenario into the Correlator. 472 473 See also L{initialize}. 474 475 @param filename: The basename of the scenario definition file to inject into the Correlator 476 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 477 @param debug: Generate debug EPL 478 @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location 479 @param functions: Sequence of tuples, where each tuple contains a function catalog name and location 480 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 481 482 """ 483 # set the command and display name 484 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") 485 jarbase = "ap-modeler" 486 487 # set the default stdout and stderr 488 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase) 489 490 # transform xargs into an instance of the xargs holder class 491 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 492 493 # creae the scenario manager configuration file 494 self.__createScenarioManagerConfig(blocks, functions) 495 496 # set the input and output names for sdf and mon 497 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon") 498 if not filedir: filedir = self.parent.input 499 sdfName = os.path.join(filedir, filename) 500 501 # set arguments to the process 502 arguments = [] 503 arguments.append("-Djava.awt.headless=true") 504 arguments.append("-DAPAMA_LOG_IMPL=simple") 505 arguments.append("-DAPAMA_LOG_LEVEL=INFO") 506 507 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) 508 arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")]) 509 if not debug: arguments.extend(["-XgenerateDebug", "false"]) 510 arguments.extend(["-Xgenerate", sdfName, monitorName]) 511 arguments.extend(xargs.arguments) 512 513 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) 514 515 # start the process to generate the EPL 516 status = self.parent.startProcess(command, arguments, self.environ, 517 displayName='scenario generation for %s'%os.path.basename(sdfName), **xargs.kwargs) 518 519 # if successful inject the EPL 520 if status and status.exitStatus == 0: 521 status = self.__inject([monitorName]) 522 if status and status.exitStatus == 0: 523 # delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc 524 os.remove(monitorName) 525 if status and status.exitStatus != 0 and not ignoreExitStatus: 526 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
527
528 - def injectQuery(self, filename, filedir=None, diagnostics=False, **xargs):
529 """Inject a Query into the Correlator. 530 531 See also L{initialize}. 532 533 @param filename: The basename of the query file to inject into the Correlator 534 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 535 @param diagnostics: Enable runtime diagnostic logging in the query 536 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 537 538 """ 539 # set the command and display name 540 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") 541 jarbase = "ap-query-codegen" 542 543 # set the default stdout and stderr 544 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase) 545 546 # transform xargs into an instance of the xargs holder class 547 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 548 549 # set the input and output names for qry and mon 550 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon") 551 if not filedir: filedir = self.parent.input 552 qryName = os.path.join(filedir, filename) 553 554 # set arguments to the process 555 arguments = [] 556 arguments.append("-Djava.awt.headless=true") 557 arguments.append("-DAPAMA_LOG_IMPL=simple") 558 arguments.append("-DAPAMA_LOG_LEVEL=INFO") 559 560 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) 561 arguments.extend(["--host", self.host, "--port", '%s' % self.port, qryName, monitorName]) 562 if diagnostics: arguments.extend(["--diagnostics"]) 563 arguments.extend(xargs.arguments) 564 565 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) 566 567 # start the process to generate the EPL 568 status = self.parent.startProcess(command, arguments, self.environ, 569 displayName='query generation for %s'%os.path.basename(qryName), **xargs.kwargs) 570 571 # if successful, inject the EPL 572 if status and (status.exitStatus==0): 573 status = self.__inject([monitorName]) 574 if status and status.exitStatus==0: 575 # delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc 576 try: 577 os.remove(monitorName) 578 except Exception, e: 579 self.parent.log.warn('Failed to remove temp file %s: %s', monitorName, e) 580 self.flush(count=6) 581 if status and status.exitStatus != 0 and not ignoreExitStatus: 582 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
583 584
585 - def sendEventStrings(self, *eventStrings, **xargs):
586 """Send one or more event strings into the Correlator. 587 588 This method writes a temporary file containing the specified strings. 589 590 See the documentation for engine_send for more information. 591 592 @param eventStrings: One or more event strings to be sent to this correlator. 593 May be unicode or byte strings. May include a channel designator. 594 595 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 596 597 @keyword channel: The channel to which events are to be sent except when specified 598 on a per-event basis. If a channel is not specified for 599 an event and you do not specify this option, the event is delivered to the 600 default channel, which means the event will go to all public contexts. 601 602 """ 603 # set the command and display name 604 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send') 605 606 # set the default stdout and stderr 607 dstdout,dstderr = self.parent.allocateUniqueStdOutErr('send') 608 609 assert eventStrings # must not be empty 610 assert not isinstance(eventStrings, basestring) # should be a list of strings, not a string 611 612 # transform xargs into an instance of the xargs holder class 613 tmpfile = dstderr.replace('.err','')+'.tmp.evt' 614 with open(tmpfile, 'w') as f: 615 for l in eventStrings: 616 print >>f, l.strip().encode('utf-8') 617 618 displayName = "engine_send <%s> [%s%s]"%(self.name, eventStrings[0], ' (+%d others)'%(len(eventStrings)-1) if len(eventStrings)>1 else '') 619 620 # set the arguments to the process 621 arguments = [] 622 arguments.extend(["-p", "%d" % self.port]) 623 if self.host: arguments.extend(["-n", self.host]) 624 arguments.append('--utf8') 625 if xargs.get('channel',''): arguments.extend(["--channel", xargs.pop('channel')]) 626 arguments.append(tmpfile) 627 628 kwargs = xargs 629 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 630 arguments.extend(xargs.arguments) 631 632 # start the process 633 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
634 635 636
637 - def send(self, filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs):
638 """Send events from one or more file into the Correlator. 639 640 See the documentation for engine_send for more information. 641 642 @param filenames: List of the basename of event files to send into the Correlator 643 @param filedir: Directory containing the input event files (defaults to testcase input directory) 644 @param loop: Number of times to loop through the input file 645 @param utf8: Assume input is in UTF8 646 @param channel: The channel to which events are to be sent except when specified 647 on a per-event basis. If a channel is not specified for 648 an event and you do not specify this option, the event is delivered to the 649 default channel, which means the event will go to all public contexts. 650 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 651 652 """ 653 # set the command and display name 654 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send') 655 656 # set the default stdout and stderr 657 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'send') 658 659 # transform xargs into an instance of the xargs holder class 660 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 661 662 # set location of input files (defaults to input directory) 663 files=[] 664 if not filedir: filedir = self.parent.input 665 if type(filenames) is types.ListType: 666 for file in filenames: files.append(os.path.join(filedir, file)) 667 elif isinstance(filenames, basestring): 668 files.append(os.path.join(filedir, filenames)) 669 else: 670 raise Exception("Input parameter for filenames is not a string or list type") 671 672 displayName = "engine_send <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files))) 673 674 # set the arguments to the process 675 arguments = [] 676 arguments.extend(["-p", "%d" % self.port]) 677 if self.host: arguments.extend(["-n", self.host]) 678 if loop: arguments.extend(["--loop", "%s" % loop]) 679 if utf8: arguments.append("--utf8") 680 if channel: arguments.extend(["--channel", channel]) 681 arguments.extend(xargs.arguments) 682 arguments.extend(files) 683 684 # start the process 685 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
686 687
688 - def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
689 """Delete named objects from the Event Crrelator. 690 691 @param names: List of names to delete from the Correlator 692 @param filename: The basename of a file containing a set of names to delete 693 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 694 @param force: Force deletion of names even if they are in use 695 @param kill: Kill name even if it is a running monitor 696 @param all: Delete everything in the Correlator 697 @param utf8: Assume input is in UTF8 698 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 699 700 """ 701 # set the command and display name 702 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_delete') 703 displayName = "engine_delete <%s> %s"%(self.name, '<all>' if all else '[%s]'%(' '.join(names))) 704 705 # set the default stdout and stderr 706 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'delete') 707 708 # transform xargs into an instance of the xargs holder class 709 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 710 711 # set location of input files (defaults to input directory) 712 if not filedir and filename: 713 filedir = self.parent.input 714 filename = os.path.join(filedir, filename) 715 716 # set the arguments to the process 717 arguments = [] 718 arguments.extend(["-p", "%d" % self.port]) 719 if self.host: arguments.extend(["-n", self.host]) 720 if filename: arguments.extend(["-f", filename]) 721 if force: arguments.append("--force") 722 if kill: arguments.append("--kill") 723 if all: arguments.extend(["--all","--yes"]) 724 if utf8: arguments.append("--utf8") 725 arguments.extend(xargs.arguments) 726 if names: arguments.extend(names) 727 728 # start the process 729 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
730 731
732 - def connect(self, source, channel=None, channels=None, mode=None, **xargs):
733 """Connect a Correlator to this instance as a source. 734 735 @param source: An instance of the L{CorrelatorHelper} class to act as the source 736 @param channel: The channel to make the connection on 737 @param channels: The list of channels to make the connection on 738 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel 739 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 740 741 """ 742 # set the command and display name 743 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_connect') 744 displayName = "engine_connect %s<%s channel '%s' -> %s>"%( 745 'disconnect ' if ('-x' in xargs.get('arguments',[]) or '--disconnect' in xargs.get('arguments',[])) else '', 746 source.name, channel or '*', self.name) 747 748 # set the default stdout and stderr 749 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'connect') 750 751 # transform xargs into an instance of the xargs holder class 752 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 753 754 # set the arguments to the process 755 arguments = [] 756 arguments.extend(["-sn", source.host]) 757 arguments.extend(["-sp", "%d" % source.port]) 758 arguments.extend(["-tn", self.host]) 759 arguments.extend(["-tp", "%d" % self.port]) 760 if channel: arguments.extend(["-c", channel]) 761 if channels: 762 assert not isinstance(channels, basestring) # should be a list of strings, not a string 763 for c in channels: 764 arguments.extend(["-c", c]) 765 if mode: arguments.extend(["-m", mode]) 766 arguments.extend(xargs.arguments) 767 768 # start the process 769 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
770 771
772 - def disconnect(self, source, channel=None, channels=None, mode=None, **xargs):
773 """Disconnect a correlator to this instance as a source correlator. 774 775 @param source: An instance of the L{CorrelatorHelper} class acting as the source 776 @param channel: The channel to be disconnected 777 @param channels: The list of channels to be disconnected 778 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel 779 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 780 781 """ 782 if xargs.has_key('arguments'): 783 if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0: 784 xargs['arguments'].append('-x') 785 else: 786 xargs['arguments'] = ['-x'] 787 self.connect(source, channel, channels, mode, **xargs)
788 789
790 - def applicationEventLogging(self, enable=True, **xargs):
791 """Enable and disable application event logging. 792 793 Provides a wrapper around the engine_management command line tool to enable and disable 794 application event logging. Once enabled, application event logging will log to the correlator 795 log file information specific processing occurrences, e.g. the receipt of events for 796 processing, the triggering of listeners, execution of the garbage collector etc. 797 798 @param enable: Set to True to enable, set to False to disable event logging 799 800 """ 801 if enable: 802 self.manage(arguments=['-r', 'applicationEventLogging', 'on'], **xargs) 803 else: 804 self.manage(arguments=['-r', 'applicationEventLogging', 'off'], **xargs)
805 806
807 - def setApplicationLogFile(self, filename=None, filedir=None, **xargs):
808 """Set the application log file name. 809 810 On setting the application log file details, the output of all native log commands within 811 EPL will be logged to the designated log file. This allows separation between the 812 log statements written by the Event Correlator i.e. for status, errors etc, and those 813 generated by the actual application. 814 815 @param filename: The basename of the file to write the application log file to 816 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 817 818 """ 819 if not filedir: filedir = self.parent.output 820 self.manage(arguments=['-r', 'setApplicationLogFile', os.path.join(filedir, filename)], **xargs)
821 822
823 - def setApplicationLogLevel(self, verbosity, **xargs):
824 """Set the application log level. 825 826 @param verbosity: The verbosity level of the application logging 827 828 """ 829 self.manage(arguments=['-r', 'setApplicationLogLevel', verbosity], **xargs)
830 831
832 - def profilingOn(self, **xargs):
833 """Inform the Event Correlator to start collecting profiling statistics. """ 834 self.manage(arguments=['-r', 'cpuProfile', 'on'], **xargs)
835 836
837 - def profilingOff(self, **xargs):
838 """Inform the Event Correlator to stop collecting profiling statistics. """ 839 self.manage(arguments=['-r', 'cpuProfile', 'off'], **xargs)
840 841
842 - def profilingReset(self, **xargs):
843 """Inform the Event Correlator to reset it's collection of profiling statistics. """ 844 self.manage(arguments=['-r', 'cpuProfile', 'reset'], **xargs)
845 846
847 - def profilingGet(self, filename, filedir=None, **xargs):
848 """Obtain the latest profiling statistics from the Event Correlator. 849 850 @param filename: The basename of the file to write the profiling statistics to 851 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 852 853 """ 854 if not filedir: filedir = self.parent.output 855 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','cpuProfile', 'get'], **xargs)
856 857
858 - def toStringAll(self, filename, filedir=None, **xargs):
859 """Obtain a stringified representation of the current application state from the Event Correlator. 860 861 @param filename: The basename of the file to write the dump of application state to 862 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 863 864 """ 865 if not filedir: filedir = self.parent.output 866 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'], **xargs)
867 868
869 - def waitForCorrelatorUp(self, *args, **kwargs):
870 """Block until the Correlator declares itself to be ready for processing. 871 872 @deprecated: Use waitForComponentUp instead. 873 """ 874 self.waitForComponentUp(*args, **kwargs)
875 876
877 - def flush(self, timeout=60, count=1, **xargs):
878 """Make sure all events have been flushed through the correlator. 879 880 Currently implemented by using the flushAllQueues management request. 881 Will initate a cycle where each queue in the correlator is drained, 882 optionally repeated count times. This is useful when you have a 883 multi-context application. 884 885 @param timeout: The amount of time to wait 886 @param count: The number of times to ensure queues are flushed 887 888 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 889 890 """ 891 flushAllQueues = 'flushAllQueues' 892 if count > 1: 893 self.manage(arguments=['-r', 'flushAllQueues', str(count)], timeout=timeout, **xargs) 894 else: 895 self.manage(arguments=['-r', 'flushAllQueues'], timeout=timeout, **xargs)
896 897
898 - def __inject(self, filenames=[], filedir=None, utf8=False, java=False, cdp=False, **xargs):
899 """Inject an application into the correlator. 900 901 Returns the process object (or None in some error cases) 902 903 """ 904 # set the command and display name 905 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_inject') 906 907 # set the default stdout and stderr 908 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'inject') 909 910 # transform xargs into an instance of the xargs holder class 911 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 912 913 # set location of input files (defaults to input directory) 914 files=[] 915 if not filedir: filedir = self.parent.input 916 if type(filenames) is types.ListType: 917 for file in filenames: files.append(os.path.join(filedir, file)) 918 elif isinstance(filenames, basestring): 919 files.append(os.path.join(filedir, filenames)) 920 else: 921 raise Exception("Input parameter for filenames is not a string or list type") 922 displayName = "engine_inject <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files))) 923 924 # set the arguments to the process 925 arguments = [] 926 arguments.extend(["-p", "%d" % self.port]) 927 if self.host: arguments.extend(["-n", self.host]) 928 if utf8: arguments.append("--utf8") 929 if java: arguments.append("--java") 930 if cdp: arguments.append("--cdp") 931 arguments.extend(xargs.arguments) 932 arguments.extend(files) 933 934 # remove from xargs so we don't pass it to startProcess directly 935 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) 936 937 # start the process 938 result = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs) 939 firstwarningline = '' 940 if result and result.exitStatus != 0: 941 with open(xargs.stderr) as f: 942 for l in f: 943 l = l.replace(self.parent.input+os.sep,'').strip() 944 if not l: continue 945 if not firstwarningline: firstwarningline = l 946 self.parent.log.warning(' %s'%l) 947 if result and result.exitStatus != 0 and not ignoreExitStatus: 948 self.parent.addOutcome(BLOCKED, 949 '%s failed: %s'%(result, firstwarningline) if firstwarningline else '%s returned non-zero exit code %d'%(result, result.exitStatus), 950 abortOnError=self.parent.defaultAbortOnError) 951 952 return result
953 954
955 - def __createScenarioManagerConfig(self, blocks=None, functions=None):
956 """Create the Scenario Manager configuration file for the location of all blocks and functions. 957 958 """ 959 impl = getDOMImplementation() 960 document = impl.createDocument(None, "config", None) 961 rootElement = document.documentElement 962 963 # create the catalogs entry 964 catalogsElement = document.createElement("catalogs") 965 rootElement.appendChild(catalogsElement) 966 967 # create the blocks element 968 blocksElement = document.createElement("blocks") 969 970 try: 971 try: 972 if self.parent.project.BLOCKS_DIR == "": 973 raise Exception("") 974 except Exception: 975 self.parent.project.BLOCKS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "blocks") 976 except AttributeError: 977 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT") 978 979 tup = [("blocks", self.parent.project.BLOCKS_DIR)] 980 if blocks: tup.extend(blocks) 981 982 for name, location in tup: 983 catalog = document.createElement("catalog") 984 nameAtttribute = document.createAttribute("name") 985 nameAtttribute.value=name 986 typeAtttribute = document.createAttribute("type") 987 typeAtttribute.value="FILE" 988 locationAtttribute = document.createAttribute("location") 989 locationAtttribute.value=location 990 catalog.setAttributeNode(nameAtttribute) 991 catalog.setAttributeNode(typeAtttribute) 992 catalog.setAttributeNode(locationAtttribute) 993 blocksElement.appendChild(catalog) 994 catalogsElement.appendChild(blocksElement) 995 996 # create the functions elememt 997 functionsElement = document.createElement("functions") 998 999 try: 1000 try: 1001 if self.parent.project.FUNCTIONS_DIR == "": 1002 raise Exception("") 1003 except Exception: 1004 self.parent.project.FUNCTIONS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "functions") 1005 except AttributeError: 1006 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT") 1007 1008 tup = [("functions", self.parent.project.FUNCTIONS_DIR)] 1009 if functions: tup.extend(functions) 1010 1011 for name, location in tup: 1012 catalog = document.createElement("catalog") 1013 nameAtttribute = document.createAttribute("name") 1014 nameAtttribute.value=name 1015 typeAtttribute = document.createAttribute("type") 1016 typeAtttribute.value="FILE" 1017 locationAtttribute = document.createAttribute("location") 1018 locationAtttribute.value=location 1019 catalog.setAttributeNode(nameAtttribute) 1020 catalog.setAttributeNode(typeAtttribute) 1021 catalog.setAttributeNode(locationAtttribute) 1022 functionsElement.appendChild(catalog) 1023 catalogsElement.appendChild(functionsElement) 1024 1025 fp = open(os.path.join(self.parent.output, "sm-config.xml"), "w") 1026 fp.write(document.toprettyxml(indent=" ")) 1027 fp.close()
1028 1029
1030 - def hasLicence(self):
1031 """ Does this correlator instance have access to a licence file? """ 1032 return self.licence != None
1033