Package apama :: Module correlator
[hide private]
[frames] | no frames]

Source Code for Module apama.correlator

   1  #!/usr/bin/env python 
   2  # Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights 
   3  # Copyright (c) 2013-2017 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. 
   4  # Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG 
   5   
   6  import sys, os, string, logging, socket, copy, random, types 
   7   
   8  from pysys import log, ThreadFilter 
   9  from pysys.constants import * 
  10  from pysys.exceptions import * 
  11   
  12  from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr 
  13  from apama.common import XArgsHolder 
  14  from apama.common import stringToUnicode 
  15   
  16  from xml.dom.minidom import getDOMImplementation 
  17   
  18   
19 -class CorrelatorHelper(ApamaServerProcess):
20 """Helper class for the Software AG Apama Event Correlator. 21 22 The Correlator Helper class has been designed for use as an extension module to the PySys System Test 23 Framework, offering the ability to configure, start and interact with an Event Correlator. The usage 24 pattern of the class is to create an instance per Correlator, and to then make method calls onto the 25 instance to perform operations such as the injection of EPL or java JMON applications, the 26 sending of events, deletion of named objects etc. For example:: 27 28 correlator = CorrelatorHelper(self, name='mycorrelator') 29 correlator.start(logfile="mycorrelator.log") 30 correlator.inject(filenames=["simple.mon"]) 31 32 Process related methods of the class declare a method signature which includes named parameters for the 33 most frequently used options to the method. They also declare the **xargs parameter to allow passing in 34 of additional supported arguments to the process. The additional arguments that are currently supported 35 via **xargs are:: 36 37 workingDir: The default value for the working directory of a process 38 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND) 39 timeout: The default value of the process timeout 40 stdout: The default value of the process stdout 41 stderr: The default value of the process stderr 42 arguments: List of extra arguments to be passed to the process 43 44 This means that legitimate calls to the start method include:: 45 46 correlator.start(logfile="correlator.log") 47 correlator.start(logfile="correlator.log", stdout="correlator1.out") 48 correlator.start(state=FOREGROUND, timeout=5) 49 50 51 @ivar parent: Reference to the PySys testcase instantiating this class instance 52 @type parent: pysys.basetest 53 @ivar port: Port used for starting and interaction with the Correlator 54 @type port: integer 55 @ivar host: Hostname for interaction with a remote Correlator 56 @type host: string 57 @ivar environ: The environment for running the Correlator 58 @type environ: dictionary 59 60 """ 61 62 """ Holds the Java classpath used when starting a correlator with JVM.""" 63 classpath = None 64 65
66 - def __init__(self, parent, port=None, host=None, name='correlator'):
67 """Create an instance of the CorrelatorHelper class. 68 69 If no port parameter is used in the argument list an available port will be dynamically found from 70 the OS and used for starting the Correlator, and performing all operations against it. The host 71 parameter is only used to perform operations against a remote Correlator started external to the 72 PySys framework - the class does not support the starting of a Correlator remote to the localhost. 73 74 @param parent: Reference to the parent PySys testcase 75 @param port: The port used for starting and interacting with the Correlator 76 @param host: The hostname used for interaction with a remote Correlator 77 @param name: A display name for this process (default is "correlator"), also used for the default 78 stdout/err filenames. 79 80 """ 81 ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host) 82 self.injectJMON = self.injectJava # old alias for this method 83 """ 84 Alias for injectJava 85 @see: L{injectJava} 86 """ 87 self.injectMon = self.injectEPL # shorter alias for injecting a .mon file 88 """ 89 Alias for injectEPL 90 @see: L{injectEPL} 91 """ 92 self.injectMonitorscript = self.injectEPL # Backwards compatibility 93 """ 94 Alias for injectEPL 95 @see: L{injectEPL} 96 """ 97 98 if self.environ.has_key("CLASSPATH"): 99 self.classpath = self.environ["CLASSPATH"] 100 """ 101 Initialise the classpath to the environment CLASSPATH, if it exists. 102 This will be removed once the correlator is no longer allowed to use CLASSPATH. 103 """
104 105
106 - def addToClassPath(self, path):
107 """Add the supplied path to the Java classpath variable for starting this instance. 108 109 """ 110 if self.classpath == None: 111 self.classpath = os.path.normpath(path) 112 else: 113 self.classpath = r"%s%s%s" % (self.classpath, ENVSEPERATOR, os.path.normpath(path))
114
115 - def addToPath(self, path):
116 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance. 117 118 """ 119 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" 120 else: key = "PATH" 121 122 if self.environ.has_key(key): 123 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path)) 124 else: 125 self.environ[key] = os.path.normpath(path)
126 127
128 - def start(self, logfile=None, verbosity=None, java=False, Xclock=None, environ=None, inputLog=None, waitForServerUp=True, config = None, **xargs):
129 """Start the Correlator. 130 131 @param logfile: Name of the Correlator log file (if used, set this to something similar to the display "name" 132 passed to the constructor) 133 @param verbosity: The verbosity level of the Correlator logging 134 @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications 135 @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode 136 @param environ: Map of environment variables to override. 137 @param inputLog: Relative or absolute path of file to write input log to, containing all events, EPL and other 138 inputs sent to the correlator. The format of the input log may change without notice so should not be 139 replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the 140 input log can also be used to pass information to customer support in the event of a problem. 141 @param waitForServerUp: Set to False to disable automatically waiting until the component is ready 142 @param config: path or list of paths to a initialization or connectivity configuration file or directory 143 containing them 144 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 145 146 """ 147 # set the command and display name 148 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'correlator') 149 150 dstdout, dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name) 151 152 # transform xargs into an instance of the xargs holder class 153 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project) 154 155 # set the licence file location 156 licence = os.path.join(self.parent.project.APAMA_WORK, 'license', 'ApamaServerLicense.xml') 157 if not os.path.exists(licence): 158 # Don't put licence on the command line if it doesn't exist - run 159 # in evaluation mode, or find it automatically 160 licence = None 161 162 # set the arguments to the process 163 arguments = [] 164 arguments.extend(["--name", self.name, "-p", "%d" % self.port]) 165 if licence is not None: arguments.extend(["-l", "%s" % licence]) 166 if logfile: arguments.extend(["-f", logfile]) 167 if verbosity: arguments.extend(["-v", verbosity]) 168 if java: arguments.append("--java") 169 if inputLog: arguments.extend(["--inputLog", os.path.join(self.parent.output, inputLog)]) 170 if Xclock: arguments.append("-Xclock") 171 if config != None: 172 if isinstance(config, basestring): 173 arguments.extend(['--config', config]) 174 elif type(config) is types.ListType: 175 for param in config: 176 arguments.extend(['--config', param]) 177 else: 178 raise Exception("Input parameter for config is not a string or list type") 179 if java and self.classpath != None: 180 arguments.extend(["--javaopt", "-Djava.class.path=%s" % self.classpath]) 181 arguments.extend(xargs.arguments) 182 183 # start the process - unicoding environment needs moving into the PySys framework 184 env = {} 185 if getattr(self.parent, 'eplcoverage','').lower()=='true': 186 env['AP_EPL_COVERAGE_FILE'] = 'correlator.${PID}.eplcoverage' 187 188 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) 189 if environ: 190 for key in environ: env[stringToUnicode(key)] = stringToUnicode(environ[key]) 191 192 if self.process and self.process.running(): 193 raise Exception('Cannot start component as an instance is already running: %s'%self) 194 195 self.process = None 196 try: 197 hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs) 198 self.process = hprocess 199 if waitForServerUp: 200 self.waitForComponentUp(timeout=xargs.timeout) 201 except Exception: 202 for f in [logfile, xargs.stdout, xargs.stderr]: 203 if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*', 'Invalid Correlator argument.*'], tail=True): break 204 raise 205 206 return hprocess
207 208
209 - def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False, 210 logChannels=False, **xargs):
211 """Attach a receiver to the Correlator. 212 213 Returns the process for the receiver. 214 215 @param filename: The basename of the file to write events received from the Correlator to 216 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 217 @param channels: List of channel names to subscribe to 218 @param logChannels: Print the channel each event came from in the output 219 @param suppressBatch: Do not include BATCH timestamps in the output 220 @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived 221 @param utf8: Write output in UTF8 222 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 223 224 """ 225 # set the command and display name 226 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_receive') 227 displayName = 'engine_receive <%s> [%s]'%(self.name, os.path.basename(filename)) 228 229 # set the default stdout and stderr 230 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'receive') 231 232 # transform xargs into an instance of the xargs holder class 233 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) 234 235 # set location of output file (defaults to output subdirectory) 236 if not filedir: filedir = self.parent.output 237 file = os.path.join(filedir, filename) 238 239 # set the arguments to the process 240 arguments = [] 241 arguments.extend(["-p", "%d" % self.port]) 242 if self.host: arguments.extend(["-n", self.host]) 243 if filename: arguments.extend(["-f", file]) 244 if suppressBatch: arguments.append("--suppressbatch") 245 if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch") 246 if utf8: arguments.append("--utf8") 247 if logChannels: 248 arguments.append("--logChannels") 249 arguments.extend(xargs.arguments) 250 for channel in channels: 251 arguments.extend(['--channel', channel]) 252 253 # start the process 254 proc = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs) 255 self.parent.waitForFile(filename, filedir = filedir) 256 return proc
257 258
259 - def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
260 """Obtain runtime operational statistics from the Correlator. 261 262 By default this runs as a BACKGROUND process. The process is returned by the method call. 263 264 @param filename: The basename of the file to write the runtime operational status to 265 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 266 @param raw: Obtain csv format data when logging to file 267 @param interval: The polling interval (seconds) between logging to file 268 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 269 270 @note: When outputing data in the raw (csv) format, the column identifiers and their positions 271 are defined by: 272 - Uptime = 0 273 - Number of monitors = 1 274 - Number of mthreads = 2 275 - Number of java applications = 3 276 - Number of listeners = 4 277 - Number of sub-listeners = 5 278 - Number of event types = 6 279 - Number of events on input queue = 7 280 - Number of events received = 8 281 - Number of events on the route queue = 9 282 - Number of events routed = 10 283 - Number of attached consumers = 11 284 - Number of events on output queue = 12 285 - Number of output events created = 13 286 - Number of output events sent = 14 287 - Number of events processed = 15 288 289 """ 290 # set the command and display name 291 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_watch') 292 293 # set the default stdout and stderr 294 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'watch') 295 296 displayName = "engine_watch <%s> -> %s"%(self.name, os.path.basename(filename or dstdout)) 297 298 # transform xargs into an instance of the xargs holder class 299 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) 300 301 # set location of output file (defaults to output subdirectory) 302 if not filedir: filedir = self.parent.output 303 304 # set the arguments to the process 305 arguments = [] 306 arguments.extend(["-p", "%d" % self.port]) 307 if self.host: arguments.extend(["-n", self.host]) 308 if filename: arguments.extend(["-f", os.path.join(filedir, filename)]) 309 if raw: arguments.append("--raw") 310 if interval: arguments.extend(["-i", "%d" % interval]) 311 arguments.extend(xargs.arguments) 312 313 # start the process 314 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
315
316 - def inspect(self, filename='inspect.txt', filedir=None, raw=False, **xargs):
317 """Obtain information about what application(s) have been injected 318 into the Correlator and what listeners are in existence. 319 320 This runs as a FOREGROUND process. 321 322 @param filename: The basename of the file to write the information to, e.g. inspect.txt 323 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 324 @param raw: Use parser-friendly output format 325 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 326 327 """ 328 assert filename 329 330 # set the command and display name 331 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_inspect') 332 displayName = "engine_inspect <%s> -> %s"%(self.name, os.path.basename(filename)) 333 334 # set the default stdout and stderr 335 dstdout,dstderr = os.path.join(self.parent.output, filename), os.path.join(self.parent.output, filename.replace('.txt','')+'.err') 336 337 # transform xargs into an instance of the xargs holder class 338 xargs=XArgsHolder(xargs, state=FOREGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) 339 340 # set location of output file (defaults to output subdirectory) 341 if not filedir: filedir = self.parent.output 342 file = os.path.join(filedir, filename) 343 344 # set the arguments to the process 345 arguments = [] 346 arguments.extend(["-p", "%d" % self.port]) 347 if self.host: arguments.extend(["-n", self.host]) 348 if raw: arguments.append("--raw") 349 arguments.extend(xargs.arguments) 350 351 # start the process 352 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
353 354
355 - def initialize(self, path, correlatorName=None, properties=None, include=None, exclude=None, **xargs):
356 """Initialize the Correlator by injecting all the files making up the project, 357 typically based on a Designer launch configuration .deploy file. 358 359 This is usually the simplest way to inject all the files from an application 360 into the correlator. Alternative approaches are to call the L{injectEPL} and related 361 methods individually for each file, or to specify the files in the "initialization" 362 section of a yaml file passed into the correlator L{start} call using the "config" argument. 363 364 Queries and Digital Event .mon files will be generated automatically as part of injection, 365 but any Java jar files must be compiled manually before invoking this method. 366 367 @param path: Path of a .deploy file from Designer (recommended), a directory, 368 or a text file listing the files to be injected. 369 Must be an absolute path, or relative to the testcase output dir. 370 @param correlatorName: The name of the Correlator as specified in the launch configuration .deploy 371 file, e.g "defaultCorrelator". If not specified, the name of this pysys correlator will be used. 372 @param properties: Optional path to a .properties file specifying ${var} placeholders 373 to be used for resolving the paths of any files outside the project directory. 374 Absolute path or relative to output dir. 375 @param include: a comma-separated string specifying which of the project files found by the tool 376 should be injected, e.g. "**/foo/Bar*.evt,**.mon". If not specified, all files will be included 377 (unless specifically excluded) 378 @param exclude: a comma-separated string specifying which of the project files found by the tool 379 should NOT be injected, e.g. "**/foo/Bar*.evt". 380 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 381 382 """ 383 # set the command and display name 384 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") 385 386 # set the default stdout and stderr 387 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'initialize-%s'%self.name) 388 389 # transform xargs into an instance of the xargs holder class 390 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 391 392 # set the input and output names for qry and mon 393 path = os.path.join(self.parent.output, path) 394 395 # set arguments to the process 396 arguments = [] 397 arguments.append("-Djava.awt.headless=true") 398 arguments.append("-DAPAMA_LOG_IMPL=log4j") 399 arguments.append("-Dlog4j.configuration=file:///%s/etc/engine-deploy-java-log4j.properties"%self.parent.project.APAMA_HOME.replace('\\','/')) 400 arguments.append("-Djava.io.tmpdir=%s"%self.parent.output) 401 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', 'ap-generate-project-init-list.jar')]) 402 arguments.append(path) 403 if properties: 404 assert isinstance(properties, basestring) 405 arguments.append(os.path.join(self.parent.output, properties)) 406 if correlatorName or ('!' not in path): 407 arguments.extend(['--correlatorName', correlatorName or self.name]) 408 arguments.extend(['--inject', self.host, str(self.port)]) 409 if include: 410 assert isinstance(include, basestring) 411 arguments.extend(['--include', include]) 412 if exclude: 413 assert isinstance(exclude, basestring) 414 arguments.extend(['--exclude', exclude]) 415 arguments.extend(xargs.arguments) 416 417 # start the process to generate the EPL 418 try: 419 status = self.parent.startProcess(command, arguments, self.environ, 420 displayName='initialize <%s> [%s]'%(self.name, os.path.basename(path)), **xargs.kwargs) 421 finally: 422 self.parent.logFileContents(xargs.stderr, maxLines=50) 423 self.parent.logFileContents(xargs.stdout, maxLines=50) 424 # in case there are queries involved 425 return status
426 427
428 - def injectEPL(self, filenames=[], filedir=None, utf8=False, **xargs):
429 """Inject EPL *.mon files into the Correlator. 430 431 See also L{initialize}. 432 433 @param filenames: List of the basename of EPL files to inject into the Correlator 434 @param filedir: Directory containing the input EPL files (defaults to testcase input directory) 435 @param utf8: Assume input is in UTF8 436 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 437 438 """ 439 return self.__inject(filenames, filedir, utf8, False, False, **xargs)
440
441 - def injectJava(self, filename, filedir=None, **xargs):
442 """Inject a Java plug-in or application into the Correlator. 443 444 See also L{initialize}. 445 446 @param filename: The basename of the jar file to inject into the Correlator 447 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 448 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 449 450 """ 451 return self.__inject([filename], filedir, False, True, False, **xargs)
452 453
454 - def injectCDP(self, filenames=[], filedir=None, **xargs):
455 """Inject Correlator deployment package into the Correlator. 456 457 See also L{initialize}. 458 459 @param filenames: List of the basename of cdp files to inject into the Correlator 460 @param filedir: Directory containing the input cdp files (defaults to testcase input directory) 461 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 462 463 """ 464 return self.__inject(filenames, filedir, False, False, True, **xargs)
465 466
467 - def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs):
468 """Inject a Scenario into the Correlator. 469 470 See also L{initialize}. 471 472 @param filename: The basename of the scenario definition file to inject into the Correlator 473 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 474 @param debug: Generate debug EPL 475 @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location 476 @param functions: Sequence of tuples, where each tuple contains a function catalog name and location 477 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 478 479 """ 480 # set the command and display name 481 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") 482 jarbase = "ap-modeler" 483 484 # set the default stdout and stderr 485 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase) 486 487 # transform xargs into an instance of the xargs holder class 488 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 489 490 # creae the scenario manager configuration file 491 self.__createScenarioManagerConfig(blocks, functions) 492 493 # set the input and output names for sdf and mon 494 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon") 495 if not filedir: filedir = self.parent.input 496 sdfName = os.path.join(filedir, filename) 497 498 # set arguments to the process 499 arguments = [] 500 arguments.append("-Djava.awt.headless=true") 501 arguments.append("-DAPAMA_LOG_IMPL=simple") 502 arguments.append("-DAPAMA_LOG_LEVEL=INFO") 503 504 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) 505 arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")]) 506 if not debug: arguments.extend(["-XgenerateDebug", "false"]) 507 arguments.extend(["-Xgenerate", sdfName, monitorName]) 508 arguments.extend(xargs.arguments) 509 510 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) 511 512 # start the process to generate the EPL 513 status = self.parent.startProcess(command, arguments, self.environ, 514 displayName='scenario generation for %s'%os.path.basename(sdfName), **xargs.kwargs) 515 516 # if successful inject the EPL 517 if status and status.exitStatus == 0: 518 status = self.__inject([monitorName]) 519 if status and status.exitStatus == 0: 520 # delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc 521 os.remove(monitorName) 522 if status and status.exitStatus != 0 and not ignoreExitStatus: 523 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
524
525 - def injectQuery(self, filename, filedir=None, diagnostics=False, **xargs):
526 """Inject a Query into the Correlator. 527 528 See also L{initialize}. 529 530 @param filename: The basename of the query file to inject into the Correlator 531 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 532 @param diagnostics: Enable runtime diagnostic logging in the query 533 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 534 535 """ 536 # set the command and display name 537 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") 538 jarbase = "ap-query-codegen" 539 540 # set the default stdout and stderr 541 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase) 542 543 # transform xargs into an instance of the xargs holder class 544 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 545 546 # set the input and output names for qry and mon 547 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon") 548 if not filedir: filedir = self.parent.input 549 qryName = os.path.join(filedir, filename) 550 551 # set arguments to the process 552 arguments = [] 553 arguments.append("-Djava.awt.headless=true") 554 arguments.append("-DAPAMA_LOG_IMPL=simple") 555 arguments.append("-DAPAMA_LOG_LEVEL=INFO") 556 557 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) 558 arguments.extend(["--host", self.host, "--port", '%s' % self.port, qryName, monitorName]) 559 if diagnostics: arguments.extend(["--diagnostics"]) 560 arguments.extend(xargs.arguments) 561 562 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) 563 564 # start the process to generate the EPL 565 status = self.parent.startProcess(command, arguments, self.environ, 566 displayName='query generation for %s'%os.path.basename(qryName), **xargs.kwargs) 567 568 # if successful, inject the EPL 569 if status and (status.exitStatus==0): 570 status = self.__inject([monitorName]) 571 if status and status.exitStatus==0: 572 # delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc 573 try: 574 os.remove(monitorName) 575 except Exception, e: 576 self.parent.log.warn('Failed to remove temp file %s: %s', monitorName, e) 577 self.flush(count=6) 578 if status and status.exitStatus != 0 and not ignoreExitStatus: 579 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
580 581
582 - def sendEventStrings(self, *eventStrings, **xargs):
583 """Send one or more event strings into the Correlator. 584 585 This method writes a temporary file containing the specified strings. 586 587 See the documentation for engine_send for more information. 588 589 @param eventStrings: One or more event strings to be sent to this correlator. 590 May be unicode or byte strings. May include a channel designator. 591 592 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 593 594 @keyword channel: The channel to which events are to be sent except when specified 595 on a per-event basis. If a channel is not specified for 596 an event and you do not specify this option, the event is delivered to the 597 default channel, which means the event will go to all public contexts. 598 599 """ 600 # set the command and display name 601 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send') 602 603 # set the default stdout and stderr 604 dstdout,dstderr = self.parent.allocateUniqueStdOutErr('send') 605 606 assert eventStrings # must not be empty 607 assert not isinstance(eventStrings, basestring) # should be a list of strings, not a string 608 609 # transform xargs into an instance of the xargs holder class 610 tmpfile = dstderr.replace('.err','')+'.tmp.evt' 611 with open(tmpfile, 'w') as f: 612 for l in eventStrings: 613 print >>f, l.strip().encode('utf-8') 614 615 displayName = "engine_send <%s> [%s%s]"%(self.name, eventStrings[0], ' (+%d others)'%(len(eventStrings)-1) if len(eventStrings)>1 else '') 616 617 # set the arguments to the process 618 arguments = [] 619 arguments.extend(["-p", "%d" % self.port]) 620 if self.host: arguments.extend(["-n", self.host]) 621 arguments.append('--utf8') 622 if xargs.get('channel',''): arguments.extend(["--channel", xargs.pop('channel')]) 623 arguments.append(tmpfile) 624 625 kwargs = xargs 626 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 627 arguments.extend(xargs.arguments) 628 629 # start the process 630 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
631 632 633
634 - def send(self, filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs):
635 """Send events from one or more file into the Correlator. 636 637 See the documentation for engine_send for more information. 638 639 @param filenames: List of the basename of event files to send into the Correlator 640 @param filedir: Directory containing the input event files (defaults to testcase input directory) 641 @param loop: Number of times to loop through the input file 642 @param utf8: Assume input is in UTF8 643 @param channel: The channel to which events are to be sent except when specified 644 on a per-event basis. If a channel is not specified for 645 an event and you do not specify this option, the event is delivered to the 646 default channel, which means the event will go to all public contexts. 647 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 648 649 """ 650 # set the command and display name 651 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send') 652 653 # set the default stdout and stderr 654 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'send') 655 656 # transform xargs into an instance of the xargs holder class 657 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 658 659 # set location of input files (defaults to input directory) 660 files=[] 661 if not filedir: filedir = self.parent.input 662 if type(filenames) is types.ListType: 663 for file in filenames: files.append(os.path.join(filedir, file)) 664 elif isinstance(filenames, basestring): 665 files.append(os.path.join(filedir, filenames)) 666 else: 667 raise Exception("Input parameter for filenames is not a string or list type") 668 669 displayName = "engine_send <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files))) 670 671 # set the arguments to the process 672 arguments = [] 673 arguments.extend(["-p", "%d" % self.port]) 674 if self.host: arguments.extend(["-n", self.host]) 675 if loop: arguments.extend(["--loop", "%s" % loop]) 676 if utf8: arguments.append("--utf8") 677 if channel: arguments.extend(["--channel", channel]) 678 arguments.extend(xargs.arguments) 679 arguments.extend(files) 680 681 # start the process 682 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
683 684
685 - def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
686 """Delete named objects from the Event Crrelator. 687 688 @param names: List of names to delete from the Correlator 689 @param filename: The basename of a file containing a set of names to delete 690 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 691 @param force: Force deletion of names even if they are in use 692 @param kill: Kill name even if it is a running monitor 693 @param all: Delete everything in the Correlator 694 @param utf8: Assume input is in UTF8 695 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 696 697 """ 698 # set the command and display name 699 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_delete') 700 displayName = "engine_delete <%s> %s"%(self.name, '<all>' if all else '[%s]'%(' '.join(names))) 701 702 # set the default stdout and stderr 703 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'delete') 704 705 # transform xargs into an instance of the xargs holder class 706 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 707 708 # set location of input files (defaults to input directory) 709 if not filedir and filename: 710 filedir = self.parent.input 711 filename = os.path.join(filedir, filename) 712 713 # set the arguments to the process 714 arguments = [] 715 arguments.extend(["-p", "%d" % self.port]) 716 if self.host: arguments.extend(["-n", self.host]) 717 if filename: arguments.extend(["-f", filename]) 718 if force: arguments.append("--force") 719 if kill: arguments.append("--kill") 720 if all: arguments.extend(["--all","--yes"]) 721 if utf8: arguments.append("--utf8") 722 arguments.extend(xargs.arguments) 723 if names: arguments.extend(names) 724 725 # start the process 726 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
727 728
729 - def connect(self, source, channel=None, channels=None, mode=None, **xargs):
730 """Connect a Correlator to this instance as a source. 731 732 @param source: An instance of the L{CorrelatorHelper} class to act as the source 733 @param channel: The channel to make the connection on 734 @param channels: The list of channels to make the connection on 735 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel 736 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 737 738 """ 739 # set the command and display name 740 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_connect') 741 displayName = "engine_connect %s<%s channel '%s' -> %s>"%( 742 'disconnect ' if ('-x' in xargs.get('arguments',[]) or '--disconnect' in xargs.get('arguments',[])) else '', 743 source.name, channel or '*', self.name) 744 745 # set the default stdout and stderr 746 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'connect') 747 748 # transform xargs into an instance of the xargs holder class 749 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 750 751 # set the arguments to the process 752 arguments = [] 753 arguments.extend(["-sn", source.host]) 754 arguments.extend(["-sp", "%d" % source.port]) 755 arguments.extend(["-tn", self.host]) 756 arguments.extend(["-tp", "%d" % self.port]) 757 if channel: arguments.extend(["-c", channel]) 758 if channels: 759 assert not isinstance(channels, basestring) # should be a list of strings, not a string 760 for c in channels: 761 arguments.extend(["-c", c]) 762 if mode: arguments.extend(["-m", mode]) 763 arguments.extend(xargs.arguments) 764 765 # start the process 766 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
767 768
769 - def disconnect(self, source, channel=None, channels=None, mode=None, **xargs):
770 """Disconnect a correlator to this instance as a source correlator. 771 772 @param source: An instance of the L{CorrelatorHelper} class acting as the source 773 @param channel: The channel to be disconnected 774 @param channels: The list of channels to be disconnected 775 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel 776 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 777 778 """ 779 if xargs.has_key('arguments'): 780 if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0: 781 xargs['arguments'].append('-x') 782 else: 783 xargs['arguments'] = ['-x'] 784 self.connect(source, channel, channels, mode, **xargs)
785 786
787 - def applicationEventLogging(self, enable=True, **xargs):
788 """Enable and disable application event logging. 789 790 Provides a wrapper around the engine_management command line tool to enable and disable 791 application event logging. Once enabled, application event logging will log to the correlator 792 log file information specific processing occurrences, e.g. the receipt of events for 793 processing, the triggering of listeners, execution of the garbage collector etc. 794 795 @param enable: Set to True to enable, set to False to disable event logging 796 797 """ 798 if enable: 799 self.manage(arguments=['-r', 'applicationEventLogging', 'on'], **xargs) 800 else: 801 self.manage(arguments=['-r', 'applicationEventLogging', 'off'], **xargs)
802 803
804 - def setApplicationLogFile(self, filename=None, filedir=None, **xargs):
805 """Set the application log file name. 806 807 On setting the application log file details, the output of all native log commands within 808 EPL will be logged to the designated log file. This allows separation between the 809 log statements written by the Event Correlator i.e. for status, errors etc, and those 810 generated by the actual application. 811 812 @param filename: The basename of the file to write the application log file to 813 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 814 815 """ 816 if not filedir: filedir = self.parent.output 817 self.manage(arguments=['-r', 'setApplicationLogFile', os.path.join(filedir, filename)], **xargs)
818 819
820 - def setApplicationLogLevel(self, verbosity, **xargs):
821 """Set the application log level. 822 823 @param verbosity: The verbosity level of the application logging 824 825 """ 826 self.manage(arguments=['-r', 'setApplicationLogLevel', verbosity], **xargs)
827 828
829 - def profilingOn(self, **xargs):
830 """Inform the Event Correlator to start collecting profiling statistics. """ 831 self.manage(arguments=['-r', 'cpuProfile', 'on'], **xargs)
832 833
834 - def profilingOff(self, **xargs):
835 """Inform the Event Correlator to stop collecting profiling statistics. """ 836 self.manage(arguments=['-r', 'cpuProfile', 'off'], **xargs)
837 838
839 - def profilingReset(self, **xargs):
840 """Inform the Event Correlator to reset it's collection of profiling statistics. """ 841 self.manage(arguments=['-r', 'cpuProfile', 'reset'], **xargs)
842 843
844 - def profilingGet(self, filename, filedir=None, **xargs):
845 """Obtain the latest profiling statistics from the Event Correlator. 846 847 @param filename: The basename of the file to write the profiling statistics to 848 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 849 850 """ 851 if not filedir: filedir = self.parent.output 852 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','cpuProfile', 'get'], **xargs)
853 854
855 - def toStringAll(self, filename, filedir=None, **xargs):
856 """Obtain a stringified representation of the current application state from the Event Correlator. 857 858 @param filename: The basename of the file to write the dump of application state to 859 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 860 861 """ 862 if not filedir: filedir = self.parent.output 863 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'], **xargs)
864 865
866 - def waitForCorrelatorUp(self, *args, **kwargs):
867 """Block until the Correlator declares itself to be ready for processing. 868 869 @deprecated: Use waitForComponentUp instead. 870 """ 871 self.waitForComponentUp(*args, **kwargs)
872 873
874 - def flush(self, timeout=60, count=1, **xargs):
875 """Make sure all events have been flushed through the correlator. 876 877 Currently implemented by using the flushAllQueues management request. 878 Will initate a cycle where each queue in the correlator is drained, 879 optionally repeated count times. This is useful when you have a 880 multi-context application. 881 882 @param timeout: The amount of time to wait 883 @param count: The number of times to ensure queues are flushed 884 885 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir 886 887 """ 888 flushAllQueues = 'flushAllQueues' 889 if count > 1: 890 self.manage(arguments=['-r', 'flushAllQueues', str(count)], timeout=timeout, **xargs) 891 else: 892 self.manage(arguments=['-r', 'flushAllQueues'], timeout=timeout, **xargs)
893 894
895 - def __inject(self, filenames=[], filedir=None, utf8=False, java=False, cdp=False, **xargs):
896 """Inject an application into the correlator. 897 898 Returns the process object (or None in some error cases) 899 900 """ 901 # set the command and display name 902 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_inject') 903 904 # set the default stdout and stderr 905 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'inject') 906 907 # transform xargs into an instance of the xargs holder class 908 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) 909 910 # set location of input files (defaults to input directory) 911 files=[] 912 if not filedir: filedir = self.parent.input 913 if type(filenames) is types.ListType: 914 for file in filenames: files.append(os.path.join(filedir, file)) 915 elif isinstance(filenames, basestring): 916 files.append(os.path.join(filedir, filenames)) 917 else: 918 raise Exception("Input parameter for filenames is not a string or list type") 919 displayName = "engine_inject <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files))) 920 921 # set the arguments to the process 922 arguments = [] 923 arguments.extend(["-p", "%d" % self.port]) 924 if self.host: arguments.extend(["-n", self.host]) 925 if utf8: arguments.append("--utf8") 926 if java: arguments.append("--java") 927 if cdp: arguments.append("--cdp") 928 arguments.extend(xargs.arguments) 929 arguments.extend(files) 930 931 # remove from xargs so we don't pass it to startProcess directly 932 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) 933 934 # start the process 935 result = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs) 936 firstwarningline = '' 937 if result and result.exitStatus != 0: 938 with open(xargs.stderr) as f: 939 for l in f: 940 l = l.replace(self.parent.input+os.sep,'').strip() 941 if not l: continue 942 if not firstwarningline: firstwarningline = l 943 self.parent.log.warning(' %s'%l) 944 if result and result.exitStatus != 0 and not ignoreExitStatus: 945 self.parent.addOutcome(BLOCKED, 946 '%s failed: %s'%(result, firstwarningline) if firstwarningline else '%s returned non-zero exit code %d'%(result, result.exitStatus), 947 abortOnError=self.parent.defaultAbortOnError) 948 949 return result
950 951
952 - def __createScenarioManagerConfig(self, blocks=None, functions=None):
953 """Create the Scenario Manager configuration file for the location of all blocks and functions. 954 955 """ 956 impl = getDOMImplementation() 957 document = impl.createDocument(None, "config", None) 958 rootElement = document.documentElement 959 960 # create the catalogs entry 961 catalogsElement = document.createElement("catalogs") 962 rootElement.appendChild(catalogsElement) 963 964 # create the blocks element 965 blocksElement = document.createElement("blocks") 966 967 try: 968 try: 969 if self.parent.project.BLOCKS_DIR == "": 970 raise Exception("") 971 except Exception: 972 self.parent.project.BLOCKS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "blocks") 973 except AttributeError: 974 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT") 975 976 tup = [("blocks", self.parent.project.BLOCKS_DIR)] 977 if blocks: tup.extend(blocks) 978 979 for name, location in tup: 980 catalog = document.createElement("catalog") 981 nameAtttribute = document.createAttribute("name") 982 nameAtttribute.value=name 983 typeAtttribute = document.createAttribute("type") 984 typeAtttribute.value="FILE" 985 locationAtttribute = document.createAttribute("location") 986 locationAtttribute.value=location 987 catalog.setAttributeNode(nameAtttribute) 988 catalog.setAttributeNode(typeAtttribute) 989 catalog.setAttributeNode(locationAtttribute) 990 blocksElement.appendChild(catalog) 991 catalogsElement.appendChild(blocksElement) 992 993 # create the functions elememt 994 functionsElement = document.createElement("functions") 995 996 try: 997 try: 998 if self.parent.project.FUNCTIONS_DIR == "": 999 raise Exception("") 1000 except Exception: 1001 self.parent.project.FUNCTIONS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "functions") 1002 except AttributeError: 1003 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT") 1004 1005 tup = [("functions", self.parent.project.FUNCTIONS_DIR)] 1006 if functions: tup.extend(functions) 1007 1008 for name, location in tup: 1009 catalog = document.createElement("catalog") 1010 nameAtttribute = document.createAttribute("name") 1011 nameAtttribute.value=name 1012 typeAtttribute = document.createAttribute("type") 1013 typeAtttribute.value="FILE" 1014 locationAtttribute = document.createAttribute("location") 1015 locationAtttribute.value=location 1016 catalog.setAttributeNode(nameAtttribute) 1017 catalog.setAttributeNode(typeAtttribute) 1018 catalog.setAttributeNode(locationAtttribute) 1019 functionsElement.appendChild(catalog) 1020 catalogsElement.appendChild(functionsElement) 1021 1022 fp = open(os.path.join(self.parent.output, "sm-config.xml"), "w") 1023 fp.write(document.toprettyxml(indent=" ")) 1024 fp.close()
1025