1
2
3
4
5
6 import sys, os, string, logging, socket, copy, random, types
7
8 from pysys import log, ThreadFilter
9 from pysys.constants import *
10 from pysys.exceptions import *
11
12 from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
13 from apama.common import XArgsHolder
14 from apama.common import stringToUnicode
15
16 from xml.dom.minidom import getDOMImplementation
17
18
20 """Helper class for the Software AG Apama Event Correlator.
21
22 The Correlator Helper class has been designed for use as an extension module to the PySys System Test
23 Framework, offering the ability to configure, start and interact with an Event Correlator. The usage
24 pattern of the class is to create an instance per Correlator, and to then make method calls onto the
25 instance to perform operations such as the injection of EPL or java JMON applications, the
26 sending of events, deletion of named objects etc. For example::
27
28 correlator = CorrelatorHelper(self, name='mycorrelator')
29 correlator.start(logfile="mycorrelator.log")
30 correlator.inject(filenames=["simple.mon"])
31
32 Process related methods of the class declare a method signature which includes named parameters for the
33 most frequently used options to the method. They also declare the **xargs parameter to allow passing in
34 of additional supported arguments to the process. The additional arguments that are currently supported
35 via **xargs are::
36
37 workingDir: The default value for the working directory of a process
38 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND)
39 timeout: The default value of the process timeout
40 stdout: The default value of the process stdout
41 stderr: The default value of the process stderr
42 arguments: List of extra arguments to be passed to the process
43
44 This means that legitimate calls to the start method include::
45
46 correlator.start(logfile="correlator.log")
47 correlator.start(logfile="correlator.log", stdout="correlator1.out")
48 correlator.start(state=FOREGROUND, timeout=5)
49
50
51 @ivar parent: Reference to the PySys testcase instantiating this class instance
52 @type parent: pysys.basetest
53 @ivar port: Port used for starting and interaction with the Correlator
54 @type port: integer
55 @ivar host: Hostname for interaction with a remote Correlator
56 @type host: string
57 @ivar environ: The environment for running the Correlator
58 @type environ: dictionary
59
60 """
61
62 """ Holds the Java classpath used when starting a correlator with JVM."""
63 classpath = None
64
65 licence = None
66
67
68 - def __init__(self, parent, port=None, host=None, name='correlator'):
69 """Create an instance of the CorrelatorHelper class.
70
71 If no port parameter is used in the argument list an available port will be dynamically found from
72 the OS and used for starting the Correlator, and performing all operations against it. The host
73 parameter is only used to perform operations against a remote Correlator started external to the
74 PySys framework - the class does not support the starting of a Correlator remote to the localhost.
75
76 @param parent: Reference to the parent PySys testcase
77 @param port: The port used for starting and interacting with the Correlator
78 @param host: The hostname used for interaction with a remote Correlator
79 @param name: A display name for this process (default is "correlator"), also used for the default
80 stdout/err filenames.
81
82 """
83 ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host)
84 self.injectJMON = self.injectJava
85 """
86 Alias for injectJava
87 @see: L{injectJava}
88 """
89 self.injectMon = self.injectEPL
90 """
91 Alias for injectEPL
92 @see: L{injectEPL}
93 """
94 self.injectMonitorscript = self.injectEPL
95 """
96 Alias for injectEPL
97 @see: L{injectEPL}
98 """
99
100 if self.environ.has_key("CLASSPATH"):
101 self.parent.log.warn('The CLASSPATH environment variable is set and will be passed to the correlator process - this behaviour is deprecated and will be removed in a future release so please ensure your test does not rely on this and unset CLASSPATH before invoking pysys if possible (CLASSPATH=%s)', self.environ["CLASSPATH"])
102 self.classpath = self.environ["CLASSPATH"]
103 """
104 Initialise the classpath to the environment CLASSPATH, if it exists.
105 This will be removed once the correlator is no longer allowed to use CLASSPATH.
106 """
107
108 self.licence = os.path.join(self.parent.project.APAMA_WORK, 'license', 'ApamaServerLicense.xml')
109 if not os.path.exists(self.licence):
110
111
112 self.licence = None
113 """ Set the licence file location. """
114
115
116
118 """Add the supplied path to the Java classpath variable for starting this instance.
119 """
120 if self.classpath == None:
121 self.classpath = os.path.normpath(path)
122 else:
123 self.classpath = r"%s%s%s" % (self.classpath, ENVSEPERATOR, os.path.normpath(path))
124
126 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance.
127
128 """
129 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH"
130 else: key = "PATH"
131
132 if self.environ.has_key(key):
133 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path))
134 else:
135 self.environ[key] = os.path.normpath(path)
136
137
138 - def start(self, logfile=None, verbosity=None, java=False, Xclock=None, environ=None, inputLog=None, waitForServerUp=True, config = None, **xargs):
139 """Start the Correlator.
140
141 @param logfile: Name of the Correlator log file (if used, set this to something similar to the display "name"
142 passed to the constructor)
143 @param verbosity: The verbosity level of the Correlator logging
144 @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications
145 @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode
146 @param environ: Map of environment variables to override.
147 @param inputLog: Relative or absolute path of file to write input log to, containing all events, EPL and other
148 inputs sent to the correlator. The format of the input log may change without notice so should not be
149 replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the
150 input log can also be used to pass information to customer support in the event of a problem.
151 @param waitForServerUp: Set to False to disable automatically waiting until the component is ready
152 @param config: path or list of paths to a initialization or connectivity configuration file or directory
153 containing them
154 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
155
156 """
157
158 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'correlator')
159
160 dstdout, dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name)
161
162
163 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project)
164
165
166 arguments = []
167 arguments.extend(["--name", self.name, "-p", "%d" % self.port])
168 if self.licence is not None: arguments.extend(["-l", "%s" % self.licence])
169 if logfile: arguments.extend(["-f", logfile])
170 if verbosity: arguments.extend(["-v", verbosity])
171 if java: arguments.append("--java")
172 if inputLog: arguments.extend(["--inputLog", os.path.join(self.parent.output, inputLog)])
173 if Xclock: arguments.append("-Xclock")
174 if config != None:
175 if isinstance(config, basestring):
176 arguments.extend(['--config', config])
177 elif type(config) is types.ListType:
178 for param in config:
179 arguments.extend(['--config', param])
180 else:
181 raise Exception("Input parameter for config is not a string or list type")
182 if java and self.classpath:
183 arguments.extend(["--javaopt", "-Djava.class.path=%s" % self.classpath])
184 arguments.extend(xargs.arguments)
185
186
187 env = {}
188 if getattr(self.parent, 'eplcoverage','').lower()=='true':
189 env['AP_EPL_COVERAGE_FILE'] = 'correlator.${PID}.eplcoverage'
190
191 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key])
192 if environ:
193 for key in environ: env[stringToUnicode(key)] = stringToUnicode(environ[key])
194
195 if self.process and self.process.running():
196 raise Exception('Cannot start component as an instance is already running: %s'%self)
197
198 self.process = None
199 try:
200 hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs)
201 self.process = hprocess
202 if waitForServerUp:
203 self.waitForComponentUp(timeout=xargs.timeout)
204 except Exception:
205 for f in [logfile, xargs.stdout, xargs.stderr]:
206 if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*', 'Invalid Correlator argument.*'], tail=True): break
207 raise
208
209 return hprocess
210
211
212 - def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False,
213 logChannels=False, **xargs):
214 """Attach a receiver to the Correlator.
215
216 Returns the process for the receiver.
217
218 @param filename: The basename of the file to write events received from the Correlator to
219 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
220 @param channels: List of channel names to subscribe to
221 @param logChannels: Print the channel each event came from in the output
222 @param suppressBatch: Do not include BATCH timestamps in the output
223 @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived
224 @param utf8: Write output in UTF8
225 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
226
227 """
228
229 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_receive')
230 displayName = 'engine_receive <%s> [%s]'%(self.name, os.path.basename(filename))
231
232
233 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'receive')
234
235
236 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
237
238
239 if not filedir: filedir = self.parent.output
240 file = os.path.join(filedir, filename)
241
242
243 arguments = []
244 arguments.extend(["-p", "%d" % self.port])
245 if self.host: arguments.extend(["-n", self.host])
246 if filename: arguments.extend(["-f", file])
247 if suppressBatch: arguments.append("--suppressbatch")
248 if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch")
249 if utf8: arguments.append("--utf8")
250 if logChannels:
251 arguments.append("--logChannels")
252 arguments.extend(xargs.arguments)
253 for channel in channels:
254 arguments.extend(['--channel', channel])
255
256
257 proc = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
258 self.parent.waitForFile(filename, filedir = filedir)
259 return proc
260
261
262 - def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
263 """Obtain runtime operational statistics from the Correlator.
264
265 By default this runs as a BACKGROUND process. The process is returned by the method call.
266
267 @param filename: The basename of the file to write the runtime operational status to
268 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
269 @param raw: Obtain csv format data when logging to file
270 @param interval: The polling interval (seconds) between logging to file
271 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
272
273 @note: When outputing data in the raw (csv) format, the column identifiers and their positions
274 are defined by:
275 - Uptime = 0
276 - Number of monitors = 1
277 - Number of mthreads = 2
278 - Number of java applications = 3
279 - Number of listeners = 4
280 - Number of sub-listeners = 5
281 - Number of event types = 6
282 - Number of events on input queue = 7
283 - Number of events received = 8
284 - Number of events on the route queue = 9
285 - Number of events routed = 10
286 - Number of attached consumers = 11
287 - Number of events on output queue = 12
288 - Number of output events created = 13
289 - Number of output events sent = 14
290 - Number of events processed = 15
291
292 """
293
294 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_watch')
295
296
297 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'watch')
298
299 displayName = "engine_watch <%s> -> %s"%(self.name, os.path.basename(filename or dstdout))
300
301
302 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
303
304
305 if not filedir: filedir = self.parent.output
306
307
308 arguments = []
309 arguments.extend(["-p", "%d" % self.port])
310 if self.host: arguments.extend(["-n", self.host])
311 if filename: arguments.extend(["-f", os.path.join(filedir, filename)])
312 if raw: arguments.append("--raw")
313 if interval: arguments.extend(["-i", "%d" % interval])
314 arguments.extend(xargs.arguments)
315
316
317 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
318
319 - def inspect(self, filename='inspect.txt', filedir=None, raw=False, **xargs):
320 """Obtain information about what application(s) have been injected
321 into the Correlator and what listeners are in existence.
322
323 This runs as a FOREGROUND process.
324
325 @param filename: The basename of the file to write the information to, e.g. inspect.txt
326 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
327 @param raw: Use parser-friendly output format
328 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
329
330 """
331 assert filename
332
333
334 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_inspect')
335 displayName = "engine_inspect <%s> -> %s"%(self.name, os.path.basename(filename))
336
337
338 dstdout,dstderr = os.path.join(self.parent.output, filename), os.path.join(self.parent.output, filename.replace('.txt','')+'.err')
339
340
341 xargs=XArgsHolder(xargs, state=FOREGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
342
343
344 if not filedir: filedir = self.parent.output
345 file = os.path.join(filedir, filename)
346
347
348 arguments = []
349 arguments.extend(["-p", "%d" % self.port])
350 if self.host: arguments.extend(["-n", self.host])
351 if raw: arguments.append("--raw")
352 arguments.extend(xargs.arguments)
353
354
355 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
356
357
358 - def initialize(self, path, correlatorName=None, properties=None, include=None, exclude=None, **xargs):
359 """Initialize the Correlator by injecting all the files making up the project,
360 typically based on a Designer launch configuration .deploy file.
361
362 This is usually the simplest way to inject all the files from an application
363 into the correlator. Alternative approaches are to call the L{injectEPL} and related
364 methods individually for each file, or to specify the files in the "initialization"
365 section of a yaml file passed into the correlator L{start} call using the "config" argument.
366
367 Queries and Digital Event .mon files will be generated automatically as part of injection,
368 but any Java jar files must be compiled manually before invoking this method.
369
370 @param path: Path of a .deploy file from Designer (recommended), a directory,
371 or a text file listing the files to be injected.
372 Must be an absolute path, or relative to the testcase output dir.
373 @param correlatorName: The name of the Correlator as specified in the launch configuration .deploy
374 file, e.g "defaultCorrelator". If not specified, the name of this pysys correlator will be used.
375 @param properties: Optional path to a .properties file specifying ${var} placeholders
376 to be used for resolving the paths of any files outside the project directory.
377 Absolute path or relative to output dir.
378 @param include: a comma-separated string specifying which of the project files found by the tool
379 should be injected, e.g. "**/foo/Bar*.evt,**.mon". If not specified, all files will be included
380 (unless specifically excluded)
381 @param exclude: a comma-separated string specifying which of the project files found by the tool
382 should NOT be injected, e.g. "**/foo/Bar*.evt".
383 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
384
385 """
386
387 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java")
388
389
390 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'initialize-%s'%self.name)
391
392
393 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
394
395
396 path = os.path.join(self.parent.output, path)
397
398
399 arguments = []
400 arguments.append("-Djava.awt.headless=true")
401 arguments.append("-DAPAMA_LOG_IMPL=log4j")
402 arguments.append("-Dlog4j.configuration=file:///%s/etc/engine-deploy-java-log4j.properties"%self.parent.project.APAMA_HOME.replace('\\','/'))
403 arguments.append("-Djava.io.tmpdir=%s"%self.parent.output)
404 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', 'ap-generate-project-init-list.jar')])
405 arguments.append(path)
406 if properties:
407 assert isinstance(properties, basestring)
408 arguments.append(os.path.join(self.parent.output, properties))
409 if correlatorName or ('!' not in path):
410 arguments.extend(['--correlatorName', correlatorName or self.name])
411 arguments.extend(['--inject', self.host, str(self.port)])
412 if include:
413 assert isinstance(include, basestring)
414 arguments.extend(['--include', include])
415 if exclude:
416 assert isinstance(exclude, basestring)
417 arguments.extend(['--exclude', exclude])
418 arguments.extend(xargs.arguments)
419
420
421 try:
422 status = self.parent.startProcess(command, arguments, self.environ,
423 displayName='initialize <%s> [%s]'%(self.name, os.path.basename(path)), **xargs.kwargs)
424 finally:
425 self.parent.logFileContents(xargs.stderr, maxLines=50)
426 self.parent.logFileContents(xargs.stdout, maxLines=50)
427
428 return status
429
430
431 - def injectEPL(self, filenames=[], filedir=None, utf8=False, **xargs):
432 """Inject EPL *.mon files into the Correlator.
433
434 See also L{initialize}.
435
436 @param filenames: List of the basename of EPL files to inject into the Correlator
437 @param filedir: Directory containing the input EPL files (defaults to testcase input directory)
438 @param utf8: Assume input is in UTF8
439 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
440
441 """
442 return self.__inject(filenames, filedir, utf8, False, False, **xargs)
443
444 - def injectJava(self, filename, filedir=None, **xargs):
445 """Inject a Java plug-in or application into the Correlator.
446
447 See also L{initialize}.
448
449 @param filename: The basename of the jar file to inject into the Correlator
450 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
451 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
452
453 """
454 return self.__inject([filename], filedir, False, True, False, **xargs)
455
456
457 - def injectCDP(self, filenames=[], filedir=None, **xargs):
458 """Inject Correlator deployment package into the Correlator.
459
460 See also L{initialize}.
461
462 @param filenames: List of the basename of cdp files to inject into the Correlator
463 @param filedir: Directory containing the input cdp files (defaults to testcase input directory)
464 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
465
466 """
467 return self.__inject(filenames, filedir, False, False, True, **xargs)
468
469
470 - def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs):
471 """Inject a Scenario into the Correlator.
472
473 See also L{initialize}.
474
475 @param filename: The basename of the scenario definition file to inject into the Correlator
476 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
477 @param debug: Generate debug EPL
478 @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location
479 @param functions: Sequence of tuples, where each tuple contains a function catalog name and location
480 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
481
482 """
483
484 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java")
485 jarbase = "ap-modeler"
486
487
488 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase)
489
490
491 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
492
493
494 self.__createScenarioManagerConfig(blocks, functions)
495
496
497 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon")
498 if not filedir: filedir = self.parent.input
499 sdfName = os.path.join(filedir, filename)
500
501
502 arguments = []
503 arguments.append("-Djava.awt.headless=true")
504 arguments.append("-DAPAMA_LOG_IMPL=simple")
505 arguments.append("-DAPAMA_LOG_LEVEL=INFO")
506
507 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)])
508 arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")])
509 if not debug: arguments.extend(["-XgenerateDebug", "false"])
510 arguments.extend(["-Xgenerate", sdfName, monitorName])
511 arguments.extend(xargs.arguments)
512
513 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
514
515
516 status = self.parent.startProcess(command, arguments, self.environ,
517 displayName='scenario generation for %s'%os.path.basename(sdfName), **xargs.kwargs)
518
519
520 if status and status.exitStatus == 0:
521 status = self.__inject([monitorName])
522 if status and status.exitStatus == 0:
523
524 os.remove(monitorName)
525 if status and status.exitStatus != 0 and not ignoreExitStatus:
526 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
527
528 - def injectQuery(self, filename, filedir=None, diagnostics=False, **xargs):
529 """Inject a Query into the Correlator.
530
531 See also L{initialize}.
532
533 @param filename: The basename of the query file to inject into the Correlator
534 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
535 @param diagnostics: Enable runtime diagnostic logging in the query
536 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
537
538 """
539
540 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java")
541 jarbase = "ap-query-codegen"
542
543
544 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase)
545
546
547 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
548
549
550 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon")
551 if not filedir: filedir = self.parent.input
552 qryName = os.path.join(filedir, filename)
553
554
555 arguments = []
556 arguments.append("-Djava.awt.headless=true")
557 arguments.append("-DAPAMA_LOG_IMPL=simple")
558 arguments.append("-DAPAMA_LOG_LEVEL=INFO")
559
560 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)])
561 arguments.extend(["--host", self.host, "--port", '%s' % self.port, qryName, monitorName])
562 if diagnostics: arguments.extend(["--diagnostics"])
563 arguments.extend(xargs.arguments)
564
565 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
566
567
568 status = self.parent.startProcess(command, arguments, self.environ,
569 displayName='query generation for %s'%os.path.basename(qryName), **xargs.kwargs)
570
571
572 if status and (status.exitStatus==0):
573 status = self.__inject([monitorName])
574 if status and status.exitStatus==0:
575
576 try:
577 os.remove(monitorName)
578 except Exception, e:
579 self.parent.log.warn('Failed to remove temp file %s: %s', monitorName, e)
580 self.flush(count=6)
581 if status and status.exitStatus != 0 and not ignoreExitStatus:
582 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
583
584
586 """Send one or more event strings into the Correlator.
587
588 This method writes a temporary file containing the specified strings.
589
590 See the documentation for engine_send for more information.
591
592 @param eventStrings: One or more event strings to be sent to this correlator.
593 May be unicode or byte strings. May include a channel designator.
594
595 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
596
597 @keyword channel: The channel to which events are to be sent except when specified
598 on a per-event basis. If a channel is not specified for
599 an event and you do not specify this option, the event is delivered to the
600 default channel, which means the event will go to all public contexts.
601
602 """
603
604 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send')
605
606
607 dstdout,dstderr = self.parent.allocateUniqueStdOutErr('send')
608
609 assert eventStrings
610 assert not isinstance(eventStrings, basestring)
611
612
613 tmpfile = dstderr.replace('.err','')+'.tmp.evt'
614 with open(tmpfile, 'w') as f:
615 for l in eventStrings:
616 print >>f, l.strip().encode('utf-8')
617
618 displayName = "engine_send <%s> [%s%s]"%(self.name, eventStrings[0], ' (+%d others)'%(len(eventStrings)-1) if len(eventStrings)>1 else '')
619
620
621 arguments = []
622 arguments.extend(["-p", "%d" % self.port])
623 if self.host: arguments.extend(["-n", self.host])
624 arguments.append('--utf8')
625 if xargs.get('channel',''): arguments.extend(["--channel", xargs.pop('channel')])
626 arguments.append(tmpfile)
627
628 kwargs = xargs
629 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
630 arguments.extend(xargs.arguments)
631
632
633 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
634
635
636
637 - def send(self, filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs):
638 """Send events from one or more file into the Correlator.
639
640 See the documentation for engine_send for more information.
641
642 @param filenames: List of the basename of event files to send into the Correlator
643 @param filedir: Directory containing the input event files (defaults to testcase input directory)
644 @param loop: Number of times to loop through the input file
645 @param utf8: Assume input is in UTF8
646 @param channel: The channel to which events are to be sent except when specified
647 on a per-event basis. If a channel is not specified for
648 an event and you do not specify this option, the event is delivered to the
649 default channel, which means the event will go to all public contexts.
650 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
651
652 """
653
654 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send')
655
656
657 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'send')
658
659
660 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
661
662
663 files=[]
664 if not filedir: filedir = self.parent.input
665 if type(filenames) is types.ListType:
666 for file in filenames: files.append(os.path.join(filedir, file))
667 elif isinstance(filenames, basestring):
668 files.append(os.path.join(filedir, filenames))
669 else:
670 raise Exception("Input parameter for filenames is not a string or list type")
671
672 displayName = "engine_send <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files)))
673
674
675 arguments = []
676 arguments.extend(["-p", "%d" % self.port])
677 if self.host: arguments.extend(["-n", self.host])
678 if loop: arguments.extend(["--loop", "%s" % loop])
679 if utf8: arguments.append("--utf8")
680 if channel: arguments.extend(["--channel", channel])
681 arguments.extend(xargs.arguments)
682 arguments.extend(files)
683
684
685 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
686
687
688 - def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
689 """Delete named objects from the Event Crrelator.
690
691 @param names: List of names to delete from the Correlator
692 @param filename: The basename of a file containing a set of names to delete
693 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
694 @param force: Force deletion of names even if they are in use
695 @param kill: Kill name even if it is a running monitor
696 @param all: Delete everything in the Correlator
697 @param utf8: Assume input is in UTF8
698 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
699
700 """
701
702 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_delete')
703 displayName = "engine_delete <%s> %s"%(self.name, '<all>' if all else '[%s]'%(' '.join(names)))
704
705
706 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'delete')
707
708
709 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
710
711
712 if not filedir and filename:
713 filedir = self.parent.input
714 filename = os.path.join(filedir, filename)
715
716
717 arguments = []
718 arguments.extend(["-p", "%d" % self.port])
719 if self.host: arguments.extend(["-n", self.host])
720 if filename: arguments.extend(["-f", filename])
721 if force: arguments.append("--force")
722 if kill: arguments.append("--kill")
723 if all: arguments.extend(["--all","--yes"])
724 if utf8: arguments.append("--utf8")
725 arguments.extend(xargs.arguments)
726 if names: arguments.extend(names)
727
728
729 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
730
731
732 - def connect(self, source, channel=None, channels=None, mode=None, **xargs):
733 """Connect a Correlator to this instance as a source.
734
735 @param source: An instance of the L{CorrelatorHelper} class to act as the source
736 @param channel: The channel to make the connection on
737 @param channels: The list of channels to make the connection on
738 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel
739 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
740
741 """
742
743 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_connect')
744 displayName = "engine_connect %s<%s channel '%s' -> %s>"%(
745 'disconnect ' if ('-x' in xargs.get('arguments',[]) or '--disconnect' in xargs.get('arguments',[])) else '',
746 source.name, channel or '*', self.name)
747
748
749 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'connect')
750
751
752 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
753
754
755 arguments = []
756 arguments.extend(["-sn", source.host])
757 arguments.extend(["-sp", "%d" % source.port])
758 arguments.extend(["-tn", self.host])
759 arguments.extend(["-tp", "%d" % self.port])
760 if channel: arguments.extend(["-c", channel])
761 if channels:
762 assert not isinstance(channels, basestring)
763 for c in channels:
764 arguments.extend(["-c", c])
765 if mode: arguments.extend(["-m", mode])
766 arguments.extend(xargs.arguments)
767
768
769 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
770
771
772 - def disconnect(self, source, channel=None, channels=None, mode=None, **xargs):
773 """Disconnect a correlator to this instance as a source correlator.
774
775 @param source: An instance of the L{CorrelatorHelper} class acting as the source
776 @param channel: The channel to be disconnected
777 @param channels: The list of channels to be disconnected
778 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel
779 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
780
781 """
782 if xargs.has_key('arguments'):
783 if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0:
784 xargs['arguments'].append('-x')
785 else:
786 xargs['arguments'] = ['-x']
787 self.connect(source, channel, channels, mode, **xargs)
788
789
791 """Enable and disable application event logging.
792
793 Provides a wrapper around the engine_management command line tool to enable and disable
794 application event logging. Once enabled, application event logging will log to the correlator
795 log file information specific processing occurrences, e.g. the receipt of events for
796 processing, the triggering of listeners, execution of the garbage collector etc.
797
798 @param enable: Set to True to enable, set to False to disable event logging
799
800 """
801 if enable:
802 self.manage(arguments=['-r', 'applicationEventLogging', 'on'], **xargs)
803 else:
804 self.manage(arguments=['-r', 'applicationEventLogging', 'off'], **xargs)
805
806
808 """Set the application log file name.
809
810 On setting the application log file details, the output of all native log commands within
811 EPL will be logged to the designated log file. This allows separation between the
812 log statements written by the Event Correlator i.e. for status, errors etc, and those
813 generated by the actual application.
814
815 @param filename: The basename of the file to write the application log file to
816 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
817
818 """
819 if not filedir: filedir = self.parent.output
820 self.manage(arguments=['-r', 'setApplicationLogFile', os.path.join(filedir, filename)], **xargs)
821
822
824 """Set the application log level.
825
826 @param verbosity: The verbosity level of the application logging
827
828 """
829 self.manage(arguments=['-r', 'setApplicationLogLevel', verbosity], **xargs)
830
831
833 """Inform the Event Correlator to start collecting profiling statistics. """
834 self.manage(arguments=['-r', 'cpuProfile', 'on'], **xargs)
835
836
838 """Inform the Event Correlator to stop collecting profiling statistics. """
839 self.manage(arguments=['-r', 'cpuProfile', 'off'], **xargs)
840
841
843 """Inform the Event Correlator to reset it's collection of profiling statistics. """
844 self.manage(arguments=['-r', 'cpuProfile', 'reset'], **xargs)
845
846
848 """Obtain the latest profiling statistics from the Event Correlator.
849
850 @param filename: The basename of the file to write the profiling statistics to
851 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
852
853 """
854 if not filedir: filedir = self.parent.output
855 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','cpuProfile', 'get'], **xargs)
856
857
858 - def toStringAll(self, filename, filedir=None, **xargs):
859 """Obtain a stringified representation of the current application state from the Event Correlator.
860
861 @param filename: The basename of the file to write the dump of application state to
862 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
863
864 """
865 if not filedir: filedir = self.parent.output
866 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'], **xargs)
867
868
870 """Block until the Correlator declares itself to be ready for processing.
871
872 @deprecated: Use waitForComponentUp instead.
873 """
874 self.waitForComponentUp(*args, **kwargs)
875
876
877 - def flush(self, timeout=60, count=1, **xargs):
878 """Make sure all events have been flushed through the correlator.
879
880 Currently implemented by using the flushAllQueues management request.
881 Will initate a cycle where each queue in the correlator is drained,
882 optionally repeated count times. This is useful when you have a
883 multi-context application.
884
885 @param timeout: The amount of time to wait
886 @param count: The number of times to ensure queues are flushed
887
888 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
889
890 """
891 flushAllQueues = 'flushAllQueues'
892 if count > 1:
893 self.manage(arguments=['-r', 'flushAllQueues', str(count)], timeout=timeout, **xargs)
894 else:
895 self.manage(arguments=['-r', 'flushAllQueues'], timeout=timeout, **xargs)
896
897
898 - def __inject(self, filenames=[], filedir=None, utf8=False, java=False, cdp=False, **xargs):
899 """Inject an application into the correlator.
900
901 Returns the process object (or None in some error cases)
902
903 """
904
905 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_inject')
906
907
908 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'inject')
909
910
911 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
912
913
914 files=[]
915 if not filedir: filedir = self.parent.input
916 if type(filenames) is types.ListType:
917 for file in filenames: files.append(os.path.join(filedir, file))
918 elif isinstance(filenames, basestring):
919 files.append(os.path.join(filedir, filenames))
920 else:
921 raise Exception("Input parameter for filenames is not a string or list type")
922 displayName = "engine_inject <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files)))
923
924
925 arguments = []
926 arguments.extend(["-p", "%d" % self.port])
927 if self.host: arguments.extend(["-n", self.host])
928 if utf8: arguments.append("--utf8")
929 if java: arguments.append("--java")
930 if cdp: arguments.append("--cdp")
931 arguments.extend(xargs.arguments)
932 arguments.extend(files)
933
934
935 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
936
937
938 result = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
939 firstwarningline = ''
940 if result and result.exitStatus != 0:
941 with open(xargs.stderr) as f:
942 for l in f:
943 l = l.replace(self.parent.input+os.sep,'').strip()
944 if not l: continue
945 if not firstwarningline: firstwarningline = l
946 self.parent.log.warning(' %s'%l)
947 if result and result.exitStatus != 0 and not ignoreExitStatus:
948 self.parent.addOutcome(BLOCKED,
949 '%s failed: %s'%(result, firstwarningline) if firstwarningline else '%s returned non-zero exit code %d'%(result, result.exitStatus),
950 abortOnError=self.parent.defaultAbortOnError)
951
952 return result
953
954
956 """Create the Scenario Manager configuration file for the location of all blocks and functions.
957
958 """
959 impl = getDOMImplementation()
960 document = impl.createDocument(None, "config", None)
961 rootElement = document.documentElement
962
963
964 catalogsElement = document.createElement("catalogs")
965 rootElement.appendChild(catalogsElement)
966
967
968 blocksElement = document.createElement("blocks")
969
970 try:
971 try:
972 if self.parent.project.BLOCKS_DIR == "":
973 raise Exception("")
974 except Exception:
975 self.parent.project.BLOCKS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "blocks")
976 except AttributeError:
977 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT")
978
979 tup = [("blocks", self.parent.project.BLOCKS_DIR)]
980 if blocks: tup.extend(blocks)
981
982 for name, location in tup:
983 catalog = document.createElement("catalog")
984 nameAtttribute = document.createAttribute("name")
985 nameAtttribute.value=name
986 typeAtttribute = document.createAttribute("type")
987 typeAtttribute.value="FILE"
988 locationAtttribute = document.createAttribute("location")
989 locationAtttribute.value=location
990 catalog.setAttributeNode(nameAtttribute)
991 catalog.setAttributeNode(typeAtttribute)
992 catalog.setAttributeNode(locationAtttribute)
993 blocksElement.appendChild(catalog)
994 catalogsElement.appendChild(blocksElement)
995
996
997 functionsElement = document.createElement("functions")
998
999 try:
1000 try:
1001 if self.parent.project.FUNCTIONS_DIR == "":
1002 raise Exception("")
1003 except Exception:
1004 self.parent.project.FUNCTIONS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "functions")
1005 except AttributeError:
1006 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT")
1007
1008 tup = [("functions", self.parent.project.FUNCTIONS_DIR)]
1009 if functions: tup.extend(functions)
1010
1011 for name, location in tup:
1012 catalog = document.createElement("catalog")
1013 nameAtttribute = document.createAttribute("name")
1014 nameAtttribute.value=name
1015 typeAtttribute = document.createAttribute("type")
1016 typeAtttribute.value="FILE"
1017 locationAtttribute = document.createAttribute("location")
1018 locationAtttribute.value=location
1019 catalog.setAttributeNode(nameAtttribute)
1020 catalog.setAttributeNode(typeAtttribute)
1021 catalog.setAttributeNode(locationAtttribute)
1022 functionsElement.appendChild(catalog)
1023 catalogsElement.appendChild(functionsElement)
1024
1025 fp = open(os.path.join(self.parent.output, "sm-config.xml"), "w")
1026 fp.write(document.toprettyxml(indent=" "))
1027 fp.close()
1028
1029
1031 """ Does this correlator instance have access to a licence file? """
1032 return self.licence != None
1033