Package apama :: Module correlator
[hide private]
[frames] | no frames]

Source Code for Module apama.correlator

  1  #!/usr/bin/env python 
  2  # Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights 
  3  # Copyright (c) 2013 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its Subsidiaries and or/its Affiliates and/or their licensors. 
  4  # Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG 
  5   
  6  import sys, os, string, logging, socket, copy, random, types 
  7   
  8  from pysys import log, ThreadFilter 
  9  from pysys.constants import * 
 10  from pysys.exceptions import * 
 11  from pysys.process.helper import ProcessWrapper 
 12   
 13  from apama.common import XArgsHolder 
 14  from apama.common import stringToUnicode 
 15   
 16  from xml.dom.minidom import getDOMImplementation 
 17   
 18  # Make a check on the project variables this helper requires. 
 19  for variable in ("APAMA_COMMON_JRE", "APAMA_LIBRARY_VERSION"): 
 20          try: 
 21                  if getattr(PROJECT, variable) == "": 
 22                          raise Exception("Project variable %s not set within the environment" % variable) 
 23          except AttributeError: 
 24                  raise Exception("Project variable %s undefined in .pysysproject" % variable) 
 25   
 26  try: 
 27          try: 
 28                  if PROJECT.APAMA_BIN_DIR == "": 
 29                          raise Exception("") 
 30          except Exception: 
 31                  PROJECT.APAMA_BIN_DIR = os.path.join(PROJECT.APAMA_HOME, 'bin') 
 32  except AttributeError: 
 33          raise Exception("Not running inside an Apama environment. APAMA_HOME and APAMA_WORK not set") 
 34   
35 -class CorrelatorHelper:
36 """Helper class for the Software AG Apama Event Correlator. 37 38 The Correlator Helper class has been designed for use as an extension module to the PySys System Test 39 Framework, offering the ability to configure, start and interact with an Event Correlator. The usage 40 pattern of the class is to create an instance per Correlator, and to then make method calls onto the 41 instance to perform operations such as the injection of monitorscript or java JMON applications, the 42 sending of events, deletion of named objects etc. For example:: 43 44 correlator = CorrelatorHelper(self) 45 correlator.start(logfile="correlator.log") 46 correlator.inject(filenames=["simple.mon"]) 47 48 Process related methods of the class declare a method signature which includes named parameters for the 49 most frequently used options to the method. They also declare the **xargs parameter to allow passing in 50 of additional supported arguments to the process. The additional arguments that are currently supported 51 via **xargs are:: 52 53 workingDir: The default value for the working directory of a process 54 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND) 55 timeout: The default value of the process timeout 56 stdout: The default value of the process stdout 57 stderr: The default value of the process stderr 58 arguments: List of extra arguments to be passed to the process 59 60 This means that legitimate calls to the start method include:: 61 62 correlator.start(logfile="correlator.log") 63 correlator.start(logfile="correlator.log", stdout="correlator1.out") 64 correlator.start(state=FOREGROUND, timeout=5) 65 66 67 @ivar parent: Reference to the PySys testcase instantiating this class instance 68 @type parent: pysys.basetest 69 @ivar port: Port used for starting and interaction with the Correlator 70 @type port: integer 71 @ivar host: Hostname for interaction with a remote Correlator 72 @type host: string 73 @ivar environ: The environment for running the Correlator 74 @type environ: dictionary 75 76 """ 77
78 - def __init__(self, parent, port=None, host=None):
79 """Create an instance of the CorrelatorHelper class. 80 81 If no port parameter is used in the argument list an available port will be dynamically found from 82 the OS and used for starting the Correlator, and performing all operations against it. The host 83 parameter is only used to perform operations against a remote Correlator started external to the 84 PySys framework - the class does not support the starting of a Correlator remote to the localhost. 85 86 @param parent: Reference to the parent PySys testcase 87 @param port: The port used for starting and interacting with the Correlator 88 @param host: The hostname used for interaction with a remote Correlator 89 90 91 """ 92 self.parent = parent 93 self.port = port 94 self.host = host 95 if self.port == None: self.port = parent.getNextAvailableTCPPort() 96 if self.host == None: self.host = "localhost" 97 self.environ = {} 98 for key in os.environ: self.environ[stringToUnicode(key)] = stringToUnicode(os.environ[key])
99 100
101 - def addToClassPath(self, path):
102 """Add the supplied path to the CLASSPATH environment variable for starting this instance. 103 104 """ 105 if self.environ.has_key("CLASSPATH"): 106 self.environ["CLASSPATH"] = r"%s%s%s" % (self.environ["CLASSPATH"], ENVSEPERATOR, os.path.normpath(path)) 107 else: 108 self.environ["CLASSPATH"] = os.path.normpath(path)
109 110
111 - def addToPath(self, path):
112 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance. 113 114 """ 115 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" 116 else: key = "PATH" 117 118 if self.environ.has_key(key): 119 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path)) 120 else: 121 self.environ[key] = os.path.normpath(path)
122 123
124 - def start(self, logfile=None, verbosity=None, java=None, Xclock=None, environ=None, **xargs):
125 """Start the Correlator. 126 127 @param logfile: Name of the Correlator log file 128 @param verbosity: The verbosity level of the Correlator logging 129 @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications 130 @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode 131 @param xargs: Variable argument list for the additional supported process parameters 132 133 """ 134 # set the command and display name 135 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'correlator') 136 displayName = "correlator" 137 138 # set the default stdout and stderr 139 instances = self.parent.getInstanceCount(displayName) 140 dstdout = os.path.join(self.parent.output, 'correlator.out') 141 dstderr = os.path.join(self.parent.output, 'correlator.err') 142 if instances: dstdout = "%s.%d" % (dstdout, instances) 143 if instances: dstderr = "%s.%d" % (dstderr, instances) 144 145 # transform xargs into an instance of the xargs holder class 146 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr) 147 148 # set the licence file location 149 licence = os.path.join(PROJECT.APAMA_WORK, 'license', 'license.txt') 150 if not os.path.exists(licence): 151 # Don't put licence on the command line if it doesn't exist - run 152 # in evaluation mode 153 licence = None 154 155 # set the arguments to the process 156 arguments = [] 157 arguments.extend(["-p", "%d" % self.port]) 158 if licence is not None: arguments.extend(["-l", "%s" % licence]) 159 if logfile: arguments.extend(["-f", logfile]) 160 if verbosity: arguments.extend(["-v", verbosity]) 161 if java: arguments.append("--java") 162 if Xclock: arguments.append("-Xclock") 163 if xargs.arguments: arguments.extend(xargs.arguments) 164 if environ is not None: 165 for key in environ: 166 env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) 167 168 # start the process - unicoding environment needs moving into the PySys framework 169 env = {} 170 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) 171 hprocess = self.parent.startProcess(command, arguments, env, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName) 172 self.waitForCorrelatorUp() 173 return hprocess
174 175
176 - def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=None, utf8=False, **xargs):
177 """Attach a receiver to the Correlator. 178 179 @param filename: The basename of the file to write events received from the Correlator to 180 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 181 @param channels: List of channel names to subscribe to 182 @param suppressBatch: Do not include BATCH timestamps in the output 183 @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived 184 @param utf8: Write output in UTF8 185 @param xargs: Variable argument list for the additional supported process parameters 186 187 """ 188 # set the command and display name 189 command = os.path.join(PROJECT.APAMA_BIN_DIR,'engine_receive') 190 displayName = "engine_receive" 191 192 # set the default stdout and stderr 193 instances = self.parent.getInstanceCount(displayName) 194 dstdout = os.path.join(self.parent.output, 'receive.out') 195 dstderr = os.path.join(self.parent.output, 'receive.err') 196 if instances: dstdout = "%s.%d" % (dstdout, instances) 197 if instances: dstderr = "%s.%d" % (dstderr, instances) 198 199 # transform xargs into an instance of the xargs holder class 200 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr) 201 202 # set location of output file (defaults to output subdirectory) 203 if not filedir: filedir = self.parent.output 204 file = os.path.join(filedir, filename) 205 206 # set the arguments to the process 207 arguments = [] 208 arguments.extend(["-p", "%d" % self.port]) 209 if self.host: arguments.extend(["-n", self.host]) 210 if filename: arguments.extend(["-f", file]) 211 if suppressBatch: arguments.append("--suppressbatch") 212 if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch") 213 if utf8: arguments.append("--utf8") 214 if xargs.arguments: arguments.extend(xargs.arguments) 215 for channel in channels: 216 arguments.extend(['--channel', channel]) 217 218 # start the process 219 proc = self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName) 220 self.parent.waitForFile(filename, filedir = filedir) 221 return proc
222 223
224 - def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
225 """Obtain runtime operational status from the Correlator. 226 227 @param filename: The basename of the file to write the runtime operational status to 228 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 229 @param raw: Obtain csv format data when logging to file 230 @param interval: The polling interval (seconds) between logging to file 231 @param xargs: Variable argument list for the additional supported process parameters 232 233 Note that when outputing data in the raw (csv) format, the column identifiers and their positions 234 are defined by:: 235 236 Uptime = 0 237 Number of monitors = 1 238 Number of mthreads = 2 239 Number of java applications = 3 240 Number of listeners = 4 241 Number of sub-listeners = 5 242 Number of event types = 6 243 Number of events on input queue = 7 244 Number of events received = 8 245 Number of events on the route queue = 9 246 Number of events routed = 10 247 Number of attached consumers = 11 248 Number of events on output queue = 12 249 Number of output events created = 13 250 Number of output events sent = 14 251 Number of events processed = 15 252 253 """ 254 # set the command and display name 255 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_watch') 256 displayName = "engine_watch" 257 258 # set the default stdout and stderr 259 instances = self.parent.getInstanceCount(displayName) 260 dstdout = os.path.join(self.parent.output, 'watch.out') 261 dstderr = os.path.join(self.parent.output, 'watch.err') 262 if instances: dstdout = "%s.%d" % (dstdout, instances) 263 if instances: dstderr = "%s.%d" % (dstderr, instances) 264 265 # transform xargs into an instance of the xargs holder class 266 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr) 267 268 # set location of output file (defaults to output subdirectory) 269 if not filedir: filedir = self.parent.output 270 file = os.path.join(filedir, filename) 271 272 # set the arguments to the process 273 arguments = [] 274 arguments.extend(["-p", "%d" % self.port]) 275 if self.host: arguments.extend(["-n", self.host]) 276 if filename: arguments.extend(["-f", file]) 277 if raw: arguments.append("--raw") 278 if interval: arguments.extend(["-i", "%d" % interval]) 279 if xargs.arguments: arguments.extend(xargs.arguments) 280 281 # start the process 282 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
283 284
285 - def injectMonitorscript(self, filenames=[], filedir=None, utf8=False, **xargs):
286 """Inject MonitorScript into the Correlator. 287 288 @param filenames: List of the basename of monitorscript files to inject into the Correlator 289 @param filedir: Directory containing the input monitorscript files (defaults to testcase input directory) 290 @param utf8: Assume input is in UTF8 291 @param xargs: Variable argument list for the additional supported process parameters 292 293 """ 294 return self.__inject(filenames, filedir, utf8, False, **xargs)
295 296
297 - def injectJMON(self, filename, filedir=None, **xargs):
298 """Inject a JMON java application into the Correlator. 299 300 @param filename: The basename of the jar file to inject into the Correlator 301 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 302 @param xargs: Variable argument list for the additional supported process parameters 303 304 """ 305 return self.__inject([filename], filedir, False, True, **xargs)
306 307
308 - def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs):
309 """Inject a Scenario into the Correlator. 310 311 @param filename: The basename of the scenario definition file to inject into the Correlator 312 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 313 @param debug: Generate debug monitorscript 314 @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location 315 @param functions: Sequence of tuples, where each tuple contains a function catalog name and location 316 @param xargs: Variable argument list for the additional supported process parameters 317 318 """ 319 # set the command and display name 320 command = os.path.join(PROJECT.APAMA_COMMON_JRE, "bin", "java") 321 jarbase = "event_modeler" 322 displayName = "%s.jar" % jarbase 323 324 # set the default stdout and stderr 325 instances = self.parent.getInstanceCount(displayName) 326 dstdout = os.path.join(self.parent.output, "%s.out" % jarbase) 327 dstderr = os.path.join(self.parent.output, "%s.err" % jarbase) 328 if instances: dstdout = "%s.%d" % (dstdout, instances) 329 if instances: dstderr = "%s.%d" % (dstderr, instances) 330 331 # transform xargs into an instance of the xargs holder class 332 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 333 334 # creae the scenario manager configuration file 335 self.__createScenarioManagerConfig(blocks, functions) 336 337 # set the input and output names for sdf and mon 338 root, ext = os.path.splitext(filename) 339 monitorName = os.path.join(self.parent.output, root + ".mon") 340 if not filedir: filedir = self.parent.input 341 sdfName = os.path.join(filedir, filename) 342 343 # set arguments to the process 344 arguments = [] 345 arguments.append("-Djava.awt.headless=true") 346 arguments.append("-DAPAMA_LOG_IMPL=simple") 347 arguments.append("-DAPAMA_LOG_LEVEL=DEBUG") 348 349 arguments.extend(["-jar", os.path.join(PROJECT.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) 350 arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")]) 351 if not debug: arguments.extend(["-XgenerateDebug", "false"]) 352 arguments.extend(["-Xgenerate", sdfName, monitorName]) 353 354 # start the process to generate the monitorscript 355 status = self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName) 356 357 # if successfull inject the monitorscript 358 if status: self.__inject([monitorName])
359 360
361 - def send(self, filenames=[], filedir=None, loop=None, utf8=False, doNoBatch=False, **xargs):
362 """Send events into the Correlator. 363 364 @param filenames: List of the basename of event files to send into the Correlator 365 @param filedir: Directory containing the input event files (defaults to testcase input directory) 366 @param loop: Number of times to loop through the input file 367 @param utf8: Assume input is in UTF8 368 @param doNoBatch: Do not automatically batch events 369 @param xargs: Variable argument list for the additional supported process parameters 370 371 """ 372 # set the command and display name 373 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_send') 374 displayName = "engine_send" 375 376 # set the default stdout and stderr 377 instances = self.parent.getInstanceCount(displayName) 378 dstdout = os.path.join(self.parent.output, 'send.out') 379 dstderr = os.path.join(self.parent.output, 'send.err') 380 if instances: dstdout = "%s.%d" % (dstdout, instances) 381 if instances: dstderr = "%s.%d" % (dstderr, instances) 382 383 # transform xargs into an instance of the xargs holder class 384 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 385 386 # set location of input files (defaults to input directory) 387 files=[] 388 if not filedir: filedir = self.parent.input 389 if type(filenames) is types.ListType: 390 for file in filenames: files.append(os.path.join(filedir, file)) 391 elif type(filenames) is types.StringType: 392 files.append(os.path.join(filedir, filenames)) 393 else: 394 self.parent.log.error("Input parameter for filenames is not a string or list type") 395 396 # set the arguments to the process 397 arguments = [] 398 arguments.extend(["-p", "%d" % self.port]) 399 if self.host: arguments.extend(["-n", self.host]) 400 if loop: arguments.extend(["--loop", "%s" % loop]) 401 if utf8: arguments.append("--utf8") 402 if doNoBatch: arguments.append("--doNoBatch") 403 if xargs.arguments: arguments.extend(xargs.arguments) 404 arguments.extend(files) 405 406 # start the process 407 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
408 409
410 - def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
411 """Delete named objects from the Event Crrelator. 412 413 @param names: List of names to delete from the Correlator 414 @param filename: The basename of a file containing a set of names to delete 415 @param filedir: The directory containing filename (defaults to testcase input subdirectory) 416 @param force: Force deletion of names even if they are in use 417 @param kill: Kill name even if it is a running monitor 418 @param all: Delete everything in the Correlator 419 @param utf8: Assume input is in UTF8 420 @param xargs: Variable argument list for the additional supported process parameters 421 422 """ 423 # set the command and display name 424 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_delete') 425 displayName = "engine_delete" 426 427 # set the default stdout and stderr 428 instances = self.parent.getInstanceCount(displayName) 429 dstdout = os.path.join(self.parent.output, 'delete.out') 430 dstderr = os.path.join(self.parent.output, 'delete.err') 431 if instances: dstdout = "%s.%d" % (dstdout, instances) 432 if instances: dstderr = "%s.%d" % (dstderr, instances) 433 434 # transform xargs into an instance of the xargs holder class 435 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 436 437 # set location of input files (defaults to input directory) 438 if not filedir and filename: 439 filedir = self.parent.input 440 filename = os.path.join(filedir, filename) 441 442 # set the arguments to the process 443 arguments = [] 444 arguments.extend(["-p", "%d" % self.port]) 445 if self.host: arguments.extend(["-n", self.host]) 446 if filename: arguments.extend(["-f", filename]) 447 if force: arguments.append("--force") 448 if kill: arguments.append("--kill") 449 if all: arguments.extend(["--all","--yes"]) 450 if utf8: arguments.append("--utf8") 451 if xargs.arguments: arguments.extend(xargs.arguments) 452 if names: arguments.extend(names) 453 454 # start the process 455 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
456 457
458 - def manage(self, **xargs):
459 """Execute management operations against the Correlator. 460 461 @param xargs: Variable argument list for the additional supported process parameters 462 463 """ 464 # set the command and display name 465 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_management') 466 displayName = "engine_management" 467 468 # set the default stdout and stderr 469 instances = self.parent.getInstanceCount(displayName) 470 dstdout = os.path.join(self.parent.output, 'manage.out') 471 dstderr = os.path.join(self.parent.output, 'manage.err') 472 if instances: dstdout = "%s.%d" % (dstdout, instances) 473 if instances: dstderr = "%s.%d" % (dstderr, instances) 474 475 # transform xargs into an instance of the xargs holder class 476 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 477 478 # set the arguments to the process 479 arguments = [] 480 arguments.extend(["-p", "%d" % self.port]) 481 if self.host: arguments.extend(["-n", self.host]) 482 if xargs.arguments: arguments.extend(xargs.arguments) 483 484 # start the process 485 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
486 487
488 - def connect(self, source, channel=None, **xargs):
489 """Connect a Correlator to this instance as a source. 490 491 @param source: An instance of the L{CorrelatorHelper} class to act as the source 492 @param channel: The channel to make the connection on 493 @param xargs: Variable argument list for the additional supported process parameters 494 495 """ 496 # set the command and display name 497 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_connect') 498 displayName = "engine_connect" 499 500 # set the default stdout and stderr 501 instances = self.parent.getInstanceCount(displayName) 502 dstdout = os.path.join(self.parent.output, 'connect.out') 503 dstderr = os.path.join(self.parent.output, 'connect.err') 504 if instances: dstdout = "%s.%d" % (dstdout, instances) 505 if instances: dstderr = "%s.%d" % (dstderr, instances) 506 507 # transform xargs into an instance of the xargs holder class 508 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 509 510 # set the arguments to the process 511 arguments = [] 512 arguments.extend(["-sn", source.host]) 513 arguments.extend(["-sp", "%d" % source.port]) 514 arguments.extend(["-tn", self.host]) 515 arguments.extend(["-tp", "%d" % self.port]) 516 if channel: arguments.extend(["-c", channel]) 517 if xargs.arguments: arguments.extend(xargs.arguments) 518 519 # start the process 520 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
521 522
523 - def disconnect(self, source, channel=None, **xargs):
524 """Disconnect a correlator to this instance as a source correlator. 525 526 @param source: An instance of the L{CorrelatorHelper} class acting as the source 527 @param channel: The channel to be disconnected 528 @param xargs: Variable argument list for the additional supported process parameters 529 530 """ 531 if xargs.has_key('arguments'): 532 if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0: 533 xargs['arguments'].append('-x') 534 else: 535 xargs['arguments'] = ['-x'] 536 self.connect(source, channel, **xargs)
537 538
539 - def applicationEventLogging(self, enable=True):
540 """Enable and disable application event logging. 541 542 Provides a wrapper around the engine_management command line tool to enable and disable 543 application event logging. Once enabled, application event logging will log to the correlator 544 log file information specific processing occurrences, e.g. the receipt of events for 545 processing, the triggering of listeners, execution of the garbage collector etc. 546 547 @param enable: Set to True to enable, set to False to disable event logging 548 549 """ 550 if enable: 551 self.manage(arguments=['-r', 'applicationEventLogging on']) 552 else: 553 self.manage(arguments=['-r', 'applicationEventLogging off'])
554 555
556 - def setApplicationLogFile(self, filename=None, filedir=None):
557 """Set the application log file name. 558 559 On setting the application log file details, the output of all native log commands within 560 MonitorScript will be logged to the designated log file. This allows seperation between the 561 log statements written by the Event Correlator i.e. for status, errors etc, and those 562 generated by the actual application. 563 564 @param filename: The basename of the file to write the application log file to 565 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 566 567 """ 568 if not filedir: filedir = self.parent.output 569 self.manage(arguments=['-r', 'setApplicationLogFile %s' % os.path.join(filedir, filename)])
570 571
572 - def setApplicationLogLevel(self, verbosity):
573 """Set the application log level. 574 575 @param verbosity: The verbosity level of the application logging 576 577 """ 578 self.manage(arguments=['-r', 'setApplicationLogLevel %s' % verbosity])
579 580
581 - def profilingOn(self):
582 """Inform the Event Correlator to start collecting profiling statistics. """ 583 self.manage(arguments=['-r', 'profiling on'])
584 585
586 - def profilingOff(self):
587 """Inform the Event Correlator to stop collecting profiling statistics. """ 588 self.manage(arguments=['-r', 'profiling off'])
589 590
591 - def profilingReset(self):
592 """Inform the Event Correlator to reset it's collection of profiling statistics. """ 593 self.manage(arguments=['-r', 'profiling reset'])
594 595
596 - def profilingGet(self, filename, filedir=None):
597 """Obtain the latest profiling statistics from the Event Correlator. 598 599 @param filename: The basename of the file to write the profiling statistics to 600 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 601 602 """ 603 if not filedir: filedir = self.parent.output 604 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','profiling get'])
605 606
607 - def toStringAll(self, filename, filedir=None):
608 """Obtain a stringified representation of the current application state from the Event Correlator. 609 610 @param filename: The basename of the file to write the dump of application state to 611 @param filedir: The directory to write filename to (defaults to testcase output subdirectory) 612 613 """ 614 if not filedir: filedir = self.parent.output 615 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'])
616 617
618 - def waitForCorrelatorUp(self, timeout=20):
619 """Block until the Correlator declares itself to be ready for processing. 620 621 """ 622 # set the command and display name 623 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_management') 624 625 # set the default stdout and stderr 626 stdout = os.path.join(self.parent.output, 'engine_management.out') 627 stderr = os.path.join(self.parent.output, 'engine_management.err') 628 629 # set the arguments to the process 630 arguments = [] 631 arguments.append("-v") 632 arguments.append("-w") 633 arguments.extend(["-p", "%d" % self.port]) 634 arguments.extend(["-n", self.host]) 635 636 self.parent.log.info("Waiting for correlator to come up ...") 637 try: 638 process = ProcessWrapper(command, arguments, self.environ, self.parent.output, FOREGROUND, timeout, stdout, stderr) 639 process.start() 640 except ProcessError, e: 641 raise Exception("Unable to start engine_management to check for correlator up: %s"%e) 642 except ProcessTimeout: 643 process.stop() 644 raise Exception("Timed out after %d seconds waiting for correlator to come up"%timeout)
645 646
647 - def flush(self, timeout=60, count=1, **xargs):
648 """Make sure all events have been flushed through the correlator. 649 650 Currently implemented by using the flushAllQueues management request. 651 Will initate a cycle where each queue in the correlator is drained, 652 optionally repeated count times. This is useful when you have a 653 multi-context application. 654 655 @param timeout: The amount of time to wait 656 @param count: The number of times to ensure queues are flushed 657 658 @param xargs: Variable argument list for the additional supported process parameters 659 660 """ 661 flushAllQueues = 'flushAllQueues' 662 if count > 1: 663 flushAllQueues = 'flushAllQueues %d' % count 664 self.manage(arguments=['-r', flushAllQueues], timeout=timeout)
665 666
667 - def __inject(self, filenames=[], filedir=None, utf8=False, java=False, **xargs):
668 """Inject an application into the correlator. 669 670 """ 671 # set the command and display name 672 command = os.path.join(PROJECT.APAMA_BIN_DIR,'engine_inject') 673 displayName = "engine_inject" 674 675 # set the default stdout and stderr 676 instances = self.parent.getInstanceCount(displayName) 677 dstdout = os.path.join(self.parent.output, 'inject.out') 678 dstderr = os.path.join(self.parent.output, 'inject.err') 679 if instances: dstdout = "%s.%d" % (dstdout, instances) 680 if instances: dstderr = "%s.%d" % (dstderr, instances) 681 682 # transform xargs into an instance of the xargs holder class 683 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr) 684 685 # set location of input files (defaults to input directory) 686 files=[] 687 if not filedir: filedir = self.parent.input 688 if type(filenames) is types.ListType: 689 for file in filenames: files.append(os.path.join(filedir, file)) 690 elif type(filenames) is types.StringType: 691 files.append(os.path.join(filedir, filenames)) 692 else: 693 self.parent.log.error("Input parameter for filenames is not a string or list type") 694 695 # set the arguments to the process 696 arguments = [] 697 arguments.extend(["-p", "%d" % self.port]) 698 if self.host: arguments.extend(["-n", self.host]) 699 if utf8: arguments.append("--utf8") 700 if java: arguments.append("--java") 701 if xargs.arguments: arguments.extend(xargs.arguments) 702 arguments.extend(files) 703 704 # start the process 705 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
706 707
708 - def __createScenarioManagerConfig(self, blocks=None, functions=None):
709 """Create the Scenario Manager configuration file for the location of all blocks and functions. 710 711 """ 712 impl = getDOMImplementation() 713 document = impl.createDocument(None, "config", None) 714 rootElement = document.documentElement 715 716 # create the catalogs entry 717 catalogsElement = document.createElement("catalogs") 718 rootElement.appendChild(catalogsElement) 719 720 # create the blocks element 721 blocksElement = document.createElement("blocks") 722 723 try: 724 try: 725 if PROJECT.BLOCKS_DIR == "": 726 raise Exception("") 727 except Exception: 728 PROJECT.BLOCKS_DIR = os.path.join(PROJECT.APAMA_HOME, "catalogs", "blocks") 729 except AttributeError: 730 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT") 731 732 tup = [("blocks", PROJECT.BLOCKS_DIR)] 733 if blocks: tup.extend(blocks) 734 735 for name, location in tup: 736 catalog = document.createElement("catalog") 737 nameAtttribute = document.createAttribute("name") 738 nameAtttribute.value=name 739 typeAtttribute = document.createAttribute("type") 740 typeAtttribute.value="FILE" 741 locationAtttribute = document.createAttribute("location") 742 locationAtttribute.value=location 743 catalog.setAttributeNode(nameAtttribute) 744 catalog.setAttributeNode(typeAtttribute) 745 catalog.setAttributeNode(locationAtttribute) 746 blocksElement.appendChild(catalog) 747 catalogsElement.appendChild(blocksElement) 748 749 # create the functions elememt 750 functionsElement = document.createElement("functions") 751 752 try: 753 try: 754 if PROJECT.FUNCTIONS_DIR == "": 755 raise Exception("") 756 except Exception: 757 PROJECT.FUNCTIONS_DIR = os.path.join(PROJECT.APAMA_HOME, "catalogs", "functions") 758 except AttributeError: 759 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT") 760 761 tup = [("functions", PROJECT.FUNCTIONS_DIR)] 762 if functions: tup.extend(functions) 763 764 for name, location in tup: 765 catalog = document.createElement("catalog") 766 nameAtttribute = document.createAttribute("name") 767 nameAtttribute.value=name 768 typeAtttribute = document.createAttribute("type") 769 typeAtttribute.value="FILE" 770 locationAtttribute = document.createAttribute("location") 771 locationAtttribute.value=location 772 catalog.setAttributeNode(nameAtttribute) 773 catalog.setAttributeNode(typeAtttribute) 774 catalog.setAttributeNode(locationAtttribute) 775 functionsElement.appendChild(catalog) 776 catalogsElement.appendChild(functionsElement) 777 778 fp = open(os.path.join(self.parent.output, "sm-config.xml"), "w") 779 fp.write(document.toprettyxml(indent=" ")) 780 fp.close()
781