1
2
3
4
5
6 import sys, os, string, logging, socket, copy, random, types
7
8 from pysys import log, ThreadFilter
9 from pysys.constants import *
10 from pysys.exceptions import *
11
12 from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
13 from apama.common import XArgsHolder
14 from apama.common import stringToUnicode
15
16 from xml.dom.minidom import getDOMImplementation
17
18
20 """Helper class for the Software AG Apama Event Correlator.
21
22 The Correlator Helper class has been designed for use as an extension module to the PySys System Test
23 Framework, offering the ability to configure, start and interact with an Event Correlator. The usage
24 pattern of the class is to create an instance per Correlator, and to then make method calls onto the
25 instance to perform operations such as the injection of EPL or java JMON applications, the
26 sending of events, deletion of named objects etc. For example::
27
28 correlator = CorrelatorHelper(self, name='mycorrelator')
29 correlator.start(logfile="mycorrelator.log")
30 correlator.inject(filenames=["simple.mon"])
31
32 Process related methods of the class declare a method signature which includes named parameters for the
33 most frequently used options to the method. They also declare the **xargs parameter to allow passing in
34 of additional supported arguments to the process. The additional arguments that are currently supported
35 via **xargs are::
36
37 workingDir: The default value for the working directory of a process
38 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND)
39 timeout: The default value of the process timeout
40 stdout: The default value of the process stdout
41 stderr: The default value of the process stderr
42 arguments: List of extra arguments to be passed to the process
43
44 This means that legitimate calls to the start method include::
45
46 correlator.start(logfile="correlator.log")
47 correlator.start(logfile="correlator.log", stdout="correlator1.out")
48 correlator.start(state=FOREGROUND, timeout=5)
49
50
51 @ivar parent: Reference to the PySys testcase instantiating this class instance
52 @type parent: pysys.basetest
53 @ivar port: Port used for starting and interaction with the Correlator
54 @type port: integer
55 @ivar host: Hostname for interaction with a remote Correlator
56 @type host: string
57 @ivar environ: The environment for running the Correlator
58 @type environ: dictionary
59
60 """
61
62 """ Holds the Java classpath used when starting a correlator with JVM."""
63 classpath = None
64
65
66 - def __init__(self, parent, port=None, host=None, name='correlator'):
67 """Create an instance of the CorrelatorHelper class.
68
69 If no port parameter is used in the argument list an available port will be dynamically found from
70 the OS and used for starting the Correlator, and performing all operations against it. The host
71 parameter is only used to perform operations against a remote Correlator started external to the
72 PySys framework - the class does not support the starting of a Correlator remote to the localhost.
73
74 @param parent: Reference to the parent PySys testcase
75 @param port: The port used for starting and interacting with the Correlator
76 @param host: The hostname used for interaction with a remote Correlator
77 @param name: A display name for this process (default is "correlator"), also used for the default
78 stdout/err filenames.
79
80 """
81 ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host)
82 self.injectJMON = self.injectJava
83 """
84 Alias for injectJava
85 @see: L{injectJava}
86 """
87 self.injectMon = self.injectEPL
88 """
89 Alias for injectEPL
90 @see: L{injectEPL}
91 """
92 self.injectMonitorscript = self.injectEPL
93 """
94 Alias for injectEPL
95 @see: L{injectEPL}
96 """
97
98 if self.environ.has_key("CLASSPATH"):
99 self.classpath = self.environ["CLASSPATH"]
100 """
101 Initialise the classpath to the environment CLASSPATH, if it exists.
102 This will be removed once the correlator is no longer allowed to use CLASSPATH.
103 """
104
105
107 """Add the supplied path to the Java classpath variable for starting this instance.
108
109 """
110 if self.classpath == None:
111 self.classpath = os.path.normpath(path)
112 else:
113 self.classpath = r"%s%s%s" % (self.classpath, ENVSEPERATOR, os.path.normpath(path))
114
116 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance.
117
118 """
119 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH"
120 else: key = "PATH"
121
122 if self.environ.has_key(key):
123 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path))
124 else:
125 self.environ[key] = os.path.normpath(path)
126
127
128 - def start(self, logfile=None, verbosity=None, java=False, Xclock=None, environ=None, inputLog=None, waitForServerUp=True, config = None, **xargs):
129 """Start the Correlator.
130
131 @param logfile: Name of the Correlator log file (if used, set this to something similar to the display "name"
132 passed to the constructor)
133 @param verbosity: The verbosity level of the Correlator logging
134 @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications
135 @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode
136 @param environ: Map of environment variables to override.
137 @param inputLog: Relative or absolute path of file to write input log to, containing all events, EPL and other
138 inputs sent to the correlator. The format of the input log may change without notice so should not be
139 replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the
140 input log can also be used to pass information to customer support in the event of a problem.
141 @param waitForServerUp: Set to False to disable automatically waiting until the component is ready
142 @param config: path or list of paths to a initialization or connectivity configuration file or directory
143 containing them
144 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
145
146 """
147
148 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'correlator')
149
150 dstdout, dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name)
151
152
153 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project)
154
155
156 licence = os.path.join(self.parent.project.APAMA_WORK, 'license', 'ApamaServerLicense.xml')
157 if not os.path.exists(licence):
158
159
160 licence = None
161
162
163 arguments = []
164 arguments.extend(["--name", self.name, "-p", "%d" % self.port])
165 if licence is not None: arguments.extend(["-l", "%s" % licence])
166 if logfile: arguments.extend(["-f", logfile])
167 if verbosity: arguments.extend(["-v", verbosity])
168 if java: arguments.append("--java")
169 if inputLog: arguments.extend(["--inputLog", os.path.join(self.parent.output, inputLog)])
170 if Xclock: arguments.append("-Xclock")
171 if config != None:
172 if isinstance(config, basestring):
173 arguments.extend(['--config', config])
174 elif type(config) is types.ListType:
175 for param in config:
176 arguments.extend(['--config', param])
177 else:
178 raise Exception("Input parameter for config is not a string or list type")
179 if java and self.classpath != None:
180 arguments.extend(["--javaopt", "-Djava.class.path=%s" % self.classpath])
181 arguments.extend(xargs.arguments)
182
183
184 env = {}
185 if getattr(self.parent, 'eplcoverage','').lower()=='true':
186 env['AP_EPL_COVERAGE_FILE'] = 'correlator.${PID}.eplcoverage'
187
188 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key])
189 if environ:
190 for key in environ: env[stringToUnicode(key)] = stringToUnicode(environ[key])
191
192 if self.process and self.process.running():
193 raise Exception('Cannot start component as an instance is already running: %s'%self)
194
195 self.process = None
196 try:
197 hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs)
198 self.process = hprocess
199 if waitForServerUp:
200 self.waitForComponentUp(timeout=xargs.timeout)
201 except Exception:
202 for f in [logfile, xargs.stdout, xargs.stderr]:
203 if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*', 'Invalid Correlator argument.*'], tail=True): break
204 raise
205
206 return hprocess
207
208
209 - def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False,
210 logChannels=False, **xargs):
211 """Attach a receiver to the Correlator.
212
213 Returns the process for the receiver.
214
215 @param filename: The basename of the file to write events received from the Correlator to
216 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
217 @param channels: List of channel names to subscribe to
218 @param logChannels: Print the channel each event came from in the output
219 @param suppressBatch: Do not include BATCH timestamps in the output
220 @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived
221 @param utf8: Write output in UTF8
222 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
223
224 """
225
226 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_receive')
227 displayName = 'engine_receive <%s> [%s]'%(self.name, os.path.basename(filename))
228
229
230 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'receive')
231
232
233 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
234
235
236 if not filedir: filedir = self.parent.output
237 file = os.path.join(filedir, filename)
238
239
240 arguments = []
241 arguments.extend(["-p", "%d" % self.port])
242 if self.host: arguments.extend(["-n", self.host])
243 if filename: arguments.extend(["-f", file])
244 if suppressBatch: arguments.append("--suppressbatch")
245 if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch")
246 if utf8: arguments.append("--utf8")
247 if logChannels:
248 arguments.append("--logChannels")
249 arguments.extend(xargs.arguments)
250 for channel in channels:
251 arguments.extend(['--channel', channel])
252
253
254 proc = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
255 self.parent.waitForFile(filename, filedir = filedir)
256 return proc
257
258
259 - def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
260 """Obtain runtime operational statistics from the Correlator.
261
262 By default this runs as a BACKGROUND process. The process is returned by the method call.
263
264 @param filename: The basename of the file to write the runtime operational status to
265 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
266 @param raw: Obtain csv format data when logging to file
267 @param interval: The polling interval (seconds) between logging to file
268 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
269
270 @note: When outputing data in the raw (csv) format, the column identifiers and their positions
271 are defined by:
272 - Uptime = 0
273 - Number of monitors = 1
274 - Number of mthreads = 2
275 - Number of java applications = 3
276 - Number of listeners = 4
277 - Number of sub-listeners = 5
278 - Number of event types = 6
279 - Number of events on input queue = 7
280 - Number of events received = 8
281 - Number of events on the route queue = 9
282 - Number of events routed = 10
283 - Number of attached consumers = 11
284 - Number of events on output queue = 12
285 - Number of output events created = 13
286 - Number of output events sent = 14
287 - Number of events processed = 15
288
289 """
290
291 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_watch')
292
293
294 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'watch')
295
296 displayName = "engine_watch <%s> -> %s"%(self.name, os.path.basename(filename or dstdout))
297
298
299 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
300
301
302 if not filedir: filedir = self.parent.output
303
304
305 arguments = []
306 arguments.extend(["-p", "%d" % self.port])
307 if self.host: arguments.extend(["-n", self.host])
308 if filename: arguments.extend(["-f", os.path.join(filedir, filename)])
309 if raw: arguments.append("--raw")
310 if interval: arguments.extend(["-i", "%d" % interval])
311 arguments.extend(xargs.arguments)
312
313
314 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
315
316 - def inspect(self, filename='inspect.txt', filedir=None, raw=False, **xargs):
317 """Obtain information about what application(s) have been injected
318 into the Correlator and what listeners are in existence.
319
320 This runs as a FOREGROUND process.
321
322 @param filename: The basename of the file to write the information to, e.g. inspect.txt
323 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
324 @param raw: Use parser-friendly output format
325 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
326
327 """
328 assert filename
329
330
331 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_inspect')
332 displayName = "engine_inspect <%s> -> %s"%(self.name, os.path.basename(filename))
333
334
335 dstdout,dstderr = os.path.join(self.parent.output, filename), os.path.join(self.parent.output, filename.replace('.txt','')+'.err')
336
337
338 xargs=XArgsHolder(xargs, state=FOREGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
339
340
341 if not filedir: filedir = self.parent.output
342 file = os.path.join(filedir, filename)
343
344
345 arguments = []
346 arguments.extend(["-p", "%d" % self.port])
347 if self.host: arguments.extend(["-n", self.host])
348 if raw: arguments.append("--raw")
349 arguments.extend(xargs.arguments)
350
351
352 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
353
354
355 - def initialize(self, path, correlatorName=None, properties=None, include=None, exclude=None, **xargs):
356 """Initialize the Correlator by injecting all the files making up the project,
357 typically based on a Designer launch configuration .deploy file.
358
359 This is usually the simplest way to inject all the files from an application
360 into the correlator. Alternative approaches are to call the L{injectEPL} and related
361 methods individually for each file, or to specify the files in the "initialization"
362 section of a yaml file passed into the correlator L{start} call using the "config" argument.
363
364 Queries and Digital Event .mon files will be generated automatically as part of injection,
365 but any Java jar files must be compiled manually before invoking this method.
366
367 @param path: Path of a .deploy file from Designer (recommended), a directory,
368 or a text file listing the files to be injected.
369 Must be an absolute path, or relative to the testcase output dir.
370 @param correlatorName: The name of the Correlator as specified in the launch configuration .deploy
371 file, e.g "defaultCorrelator". If not specified, the name of this pysys correlator will be used.
372 @param properties: Optional path to a .properties file specifying ${var} placeholders
373 to be used for resolving the paths of any files outside the project directory.
374 Absolute path or relative to output dir.
375 @param include: a comma-separated string specifying which of the project files found by the tool
376 should be injected, e.g. "**/foo/Bar*.evt,**.mon". If not specified, all files will be included
377 (unless specifically excluded)
378 @param exclude: a comma-separated string specifying which of the project files found by the tool
379 should NOT be injected, e.g. "**/foo/Bar*.evt".
380 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
381
382 """
383
384 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java")
385
386
387 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'initialize-%s'%self.name)
388
389
390 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
391
392
393 path = os.path.join(self.parent.output, path)
394
395
396 arguments = []
397 arguments.append("-Djava.awt.headless=true")
398 arguments.append("-DAPAMA_LOG_IMPL=log4j")
399 arguments.append("-Dlog4j.configuration=file:///%s/etc/engine-deploy-java-log4j.properties"%self.parent.project.APAMA_HOME.replace('\\','/'))
400 arguments.append("-Djava.io.tmpdir=%s"%self.parent.output)
401 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', 'ap-generate-project-init-list.jar')])
402 arguments.append(path)
403 if properties:
404 assert isinstance(properties, basestring)
405 arguments.append(os.path.join(self.parent.output, properties))
406 if correlatorName or ('!' not in path):
407 arguments.extend(['--correlatorName', correlatorName or self.name])
408 arguments.extend(['--inject', self.host, str(self.port)])
409 if include:
410 assert isinstance(include, basestring)
411 arguments.extend(['--include', include])
412 if exclude:
413 assert isinstance(exclude, basestring)
414 arguments.extend(['--exclude', exclude])
415 arguments.extend(xargs.arguments)
416
417
418 try:
419 status = self.parent.startProcess(command, arguments, self.environ,
420 displayName='initialize <%s> [%s]'%(self.name, os.path.basename(path)), **xargs.kwargs)
421 finally:
422 self.parent.logFileContents(xargs.stderr, maxLines=50)
423 self.parent.logFileContents(xargs.stdout, maxLines=50)
424
425 return status
426
427
428 - def injectEPL(self, filenames=[], filedir=None, utf8=False, **xargs):
429 """Inject EPL *.mon files into the Correlator.
430
431 See also L{initialize}.
432
433 @param filenames: List of the basename of EPL files to inject into the Correlator
434 @param filedir: Directory containing the input EPL files (defaults to testcase input directory)
435 @param utf8: Assume input is in UTF8
436 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
437
438 """
439 return self.__inject(filenames, filedir, utf8, False, False, **xargs)
440
441 - def injectJava(self, filename, filedir=None, **xargs):
442 """Inject a Java plug-in or application into the Correlator.
443
444 See also L{initialize}.
445
446 @param filename: The basename of the jar file to inject into the Correlator
447 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
448 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
449
450 """
451 return self.__inject([filename], filedir, False, True, False, **xargs)
452
453
454 - def injectCDP(self, filenames=[], filedir=None, **xargs):
455 """Inject Correlator deployment package into the Correlator.
456
457 See also L{initialize}.
458
459 @param filenames: List of the basename of cdp files to inject into the Correlator
460 @param filedir: Directory containing the input cdp files (defaults to testcase input directory)
461 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
462
463 """
464 return self.__inject(filenames, filedir, False, False, True, **xargs)
465
466
467 - def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs):
468 """Inject a Scenario into the Correlator.
469
470 See also L{initialize}.
471
472 @param filename: The basename of the scenario definition file to inject into the Correlator
473 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
474 @param debug: Generate debug EPL
475 @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location
476 @param functions: Sequence of tuples, where each tuple contains a function catalog name and location
477 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
478
479 """
480
481 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java")
482 jarbase = "ap-modeler"
483
484
485 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase)
486
487
488 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
489
490
491 self.__createScenarioManagerConfig(blocks, functions)
492
493
494 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon")
495 if not filedir: filedir = self.parent.input
496 sdfName = os.path.join(filedir, filename)
497
498
499 arguments = []
500 arguments.append("-Djava.awt.headless=true")
501 arguments.append("-DAPAMA_LOG_IMPL=simple")
502 arguments.append("-DAPAMA_LOG_LEVEL=INFO")
503
504 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)])
505 arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")])
506 if not debug: arguments.extend(["-XgenerateDebug", "false"])
507 arguments.extend(["-Xgenerate", sdfName, monitorName])
508 arguments.extend(xargs.arguments)
509
510 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
511
512
513 status = self.parent.startProcess(command, arguments, self.environ,
514 displayName='scenario generation for %s'%os.path.basename(sdfName), **xargs.kwargs)
515
516
517 if status and status.exitStatus == 0:
518 status = self.__inject([monitorName])
519 if status and status.exitStatus == 0:
520
521 os.remove(monitorName)
522 if status and status.exitStatus != 0 and not ignoreExitStatus:
523 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
524
525 - def injectQuery(self, filename, filedir=None, diagnostics=False, **xargs):
526 """Inject a Query into the Correlator.
527
528 See also L{initialize}.
529
530 @param filename: The basename of the query file to inject into the Correlator
531 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
532 @param diagnostics: Enable runtime diagnostic logging in the query
533 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
534
535 """
536
537 command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java")
538 jarbase = "ap-query-codegen"
539
540
541 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase)
542
543
544 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
545
546
547 monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon")
548 if not filedir: filedir = self.parent.input
549 qryName = os.path.join(filedir, filename)
550
551
552 arguments = []
553 arguments.append("-Djava.awt.headless=true")
554 arguments.append("-DAPAMA_LOG_IMPL=simple")
555 arguments.append("-DAPAMA_LOG_LEVEL=INFO")
556
557 arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)])
558 arguments.extend(["--host", self.host, "--port", '%s' % self.port, qryName, monitorName])
559 if diagnostics: arguments.extend(["--diagnostics"])
560 arguments.extend(xargs.arguments)
561
562 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
563
564
565 status = self.parent.startProcess(command, arguments, self.environ,
566 displayName='query generation for %s'%os.path.basename(qryName), **xargs.kwargs)
567
568
569 if status and (status.exitStatus==0):
570 status = self.__inject([monitorName])
571 if status and status.exitStatus==0:
572
573 try:
574 os.remove(monitorName)
575 except Exception, e:
576 self.parent.log.warn('Failed to remove temp file %s: %s', monitorName, e)
577 self.flush(count=6)
578 if status and status.exitStatus != 0 and not ignoreExitStatus:
579 self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
580
581
583 """Send one or more event strings into the Correlator.
584
585 This method writes a temporary file containing the specified strings.
586
587 See the documentation for engine_send for more information.
588
589 @param eventStrings: One or more event strings to be sent to this correlator.
590 May be unicode or byte strings. May include a channel designator.
591
592 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
593
594 @keyword channel: The channel to which events are to be sent except when specified
595 on a per-event basis. If a channel is not specified for
596 an event and you do not specify this option, the event is delivered to the
597 default channel, which means the event will go to all public contexts.
598
599 """
600
601 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send')
602
603
604 dstdout,dstderr = self.parent.allocateUniqueStdOutErr('send')
605
606 assert eventStrings
607 assert not isinstance(eventStrings, basestring)
608
609
610 tmpfile = dstderr.replace('.err','')+'.tmp.evt'
611 with open(tmpfile, 'w') as f:
612 for l in eventStrings:
613 print >>f, l.strip().encode('utf-8')
614
615 displayName = "engine_send <%s> [%s%s]"%(self.name, eventStrings[0], ' (+%d others)'%(len(eventStrings)-1) if len(eventStrings)>1 else '')
616
617
618 arguments = []
619 arguments.extend(["-p", "%d" % self.port])
620 if self.host: arguments.extend(["-n", self.host])
621 arguments.append('--utf8')
622 if xargs.get('channel',''): arguments.extend(["--channel", xargs.pop('channel')])
623 arguments.append(tmpfile)
624
625 kwargs = xargs
626 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
627 arguments.extend(xargs.arguments)
628
629
630 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
631
632
633
634 - def send(self, filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs):
635 """Send events from one or more file into the Correlator.
636
637 See the documentation for engine_send for more information.
638
639 @param filenames: List of the basename of event files to send into the Correlator
640 @param filedir: Directory containing the input event files (defaults to testcase input directory)
641 @param loop: Number of times to loop through the input file
642 @param utf8: Assume input is in UTF8
643 @param channel: The channel to which events are to be sent except when specified
644 on a per-event basis. If a channel is not specified for
645 an event and you do not specify this option, the event is delivered to the
646 default channel, which means the event will go to all public contexts.
647 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
648
649 """
650
651 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send')
652
653
654 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'send')
655
656
657 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
658
659
660 files=[]
661 if not filedir: filedir = self.parent.input
662 if type(filenames) is types.ListType:
663 for file in filenames: files.append(os.path.join(filedir, file))
664 elif isinstance(filenames, basestring):
665 files.append(os.path.join(filedir, filenames))
666 else:
667 raise Exception("Input parameter for filenames is not a string or list type")
668
669 displayName = "engine_send <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files)))
670
671
672 arguments = []
673 arguments.extend(["-p", "%d" % self.port])
674 if self.host: arguments.extend(["-n", self.host])
675 if loop: arguments.extend(["--loop", "%s" % loop])
676 if utf8: arguments.append("--utf8")
677 if channel: arguments.extend(["--channel", channel])
678 arguments.extend(xargs.arguments)
679 arguments.extend(files)
680
681
682 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
683
684
685 - def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
686 """Delete named objects from the Event Crrelator.
687
688 @param names: List of names to delete from the Correlator
689 @param filename: The basename of a file containing a set of names to delete
690 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
691 @param force: Force deletion of names even if they are in use
692 @param kill: Kill name even if it is a running monitor
693 @param all: Delete everything in the Correlator
694 @param utf8: Assume input is in UTF8
695 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
696
697 """
698
699 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_delete')
700 displayName = "engine_delete <%s> %s"%(self.name, '<all>' if all else '[%s]'%(' '.join(names)))
701
702
703 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'delete')
704
705
706 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
707
708
709 if not filedir and filename:
710 filedir = self.parent.input
711 filename = os.path.join(filedir, filename)
712
713
714 arguments = []
715 arguments.extend(["-p", "%d" % self.port])
716 if self.host: arguments.extend(["-n", self.host])
717 if filename: arguments.extend(["-f", filename])
718 if force: arguments.append("--force")
719 if kill: arguments.append("--kill")
720 if all: arguments.extend(["--all","--yes"])
721 if utf8: arguments.append("--utf8")
722 arguments.extend(xargs.arguments)
723 if names: arguments.extend(names)
724
725
726 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
727
728
729 - def connect(self, source, channel=None, channels=None, mode=None, **xargs):
730 """Connect a Correlator to this instance as a source.
731
732 @param source: An instance of the L{CorrelatorHelper} class to act as the source
733 @param channel: The channel to make the connection on
734 @param channels: The list of channels to make the connection on
735 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel
736 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
737
738 """
739
740 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_connect')
741 displayName = "engine_connect %s<%s channel '%s' -> %s>"%(
742 'disconnect ' if ('-x' in xargs.get('arguments',[]) or '--disconnect' in xargs.get('arguments',[])) else '',
743 source.name, channel or '*', self.name)
744
745
746 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'connect')
747
748
749 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
750
751
752 arguments = []
753 arguments.extend(["-sn", source.host])
754 arguments.extend(["-sp", "%d" % source.port])
755 arguments.extend(["-tn", self.host])
756 arguments.extend(["-tp", "%d" % self.port])
757 if channel: arguments.extend(["-c", channel])
758 if channels:
759 assert not isinstance(channels, basestring)
760 for c in channels:
761 arguments.extend(["-c", c])
762 if mode: arguments.extend(["-m", mode])
763 arguments.extend(xargs.arguments)
764
765
766 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
767
768
769 - def disconnect(self, source, channel=None, channels=None, mode=None, **xargs):
770 """Disconnect a correlator to this instance as a source correlator.
771
772 @param source: An instance of the L{CorrelatorHelper} class acting as the source
773 @param channel: The channel to be disconnected
774 @param channels: The list of channels to be disconnected
775 @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel
776 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
777
778 """
779 if xargs.has_key('arguments'):
780 if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0:
781 xargs['arguments'].append('-x')
782 else:
783 xargs['arguments'] = ['-x']
784 self.connect(source, channel, channels, mode, **xargs)
785
786
788 """Enable and disable application event logging.
789
790 Provides a wrapper around the engine_management command line tool to enable and disable
791 application event logging. Once enabled, application event logging will log to the correlator
792 log file information specific processing occurrences, e.g. the receipt of events for
793 processing, the triggering of listeners, execution of the garbage collector etc.
794
795 @param enable: Set to True to enable, set to False to disable event logging
796
797 """
798 if enable:
799 self.manage(arguments=['-r', 'applicationEventLogging', 'on'], **xargs)
800 else:
801 self.manage(arguments=['-r', 'applicationEventLogging', 'off'], **xargs)
802
803
805 """Set the application log file name.
806
807 On setting the application log file details, the output of all native log commands within
808 EPL will be logged to the designated log file. This allows separation between the
809 log statements written by the Event Correlator i.e. for status, errors etc, and those
810 generated by the actual application.
811
812 @param filename: The basename of the file to write the application log file to
813 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
814
815 """
816 if not filedir: filedir = self.parent.output
817 self.manage(arguments=['-r', 'setApplicationLogFile', os.path.join(filedir, filename)], **xargs)
818
819
821 """Set the application log level.
822
823 @param verbosity: The verbosity level of the application logging
824
825 """
826 self.manage(arguments=['-r', 'setApplicationLogLevel', verbosity], **xargs)
827
828
830 """Inform the Event Correlator to start collecting profiling statistics. """
831 self.manage(arguments=['-r', 'cpuProfile', 'on'], **xargs)
832
833
835 """Inform the Event Correlator to stop collecting profiling statistics. """
836 self.manage(arguments=['-r', 'cpuProfile', 'off'], **xargs)
837
838
840 """Inform the Event Correlator to reset it's collection of profiling statistics. """
841 self.manage(arguments=['-r', 'cpuProfile', 'reset'], **xargs)
842
843
845 """Obtain the latest profiling statistics from the Event Correlator.
846
847 @param filename: The basename of the file to write the profiling statistics to
848 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
849
850 """
851 if not filedir: filedir = self.parent.output
852 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','cpuProfile', 'get'], **xargs)
853
854
855 - def toStringAll(self, filename, filedir=None, **xargs):
856 """Obtain a stringified representation of the current application state from the Event Correlator.
857
858 @param filename: The basename of the file to write the dump of application state to
859 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
860
861 """
862 if not filedir: filedir = self.parent.output
863 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'], **xargs)
864
865
867 """Block until the Correlator declares itself to be ready for processing.
868
869 @deprecated: Use waitForComponentUp instead.
870 """
871 self.waitForComponentUp(*args, **kwargs)
872
873
874 - def flush(self, timeout=60, count=1, **xargs):
875 """Make sure all events have been flushed through the correlator.
876
877 Currently implemented by using the flushAllQueues management request.
878 Will initate a cycle where each queue in the correlator is drained,
879 optionally repeated count times. This is useful when you have a
880 multi-context application.
881
882 @param timeout: The amount of time to wait
883 @param count: The number of times to ensure queues are flushed
884
885 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
886
887 """
888 flushAllQueues = 'flushAllQueues'
889 if count > 1:
890 self.manage(arguments=['-r', 'flushAllQueues', str(count)], timeout=timeout, **xargs)
891 else:
892 self.manage(arguments=['-r', 'flushAllQueues'], timeout=timeout, **xargs)
893
894
895 - def __inject(self, filenames=[], filedir=None, utf8=False, java=False, cdp=False, **xargs):
896 """Inject an application into the correlator.
897
898 Returns the process object (or None in some error cases)
899
900 """
901
902 command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_inject')
903
904
905 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'inject')
906
907
908 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
909
910
911 files=[]
912 if not filedir: filedir = self.parent.input
913 if type(filenames) is types.ListType:
914 for file in filenames: files.append(os.path.join(filedir, file))
915 elif isinstance(filenames, basestring):
916 files.append(os.path.join(filedir, filenames))
917 else:
918 raise Exception("Input parameter for filenames is not a string or list type")
919 displayName = "engine_inject <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files)))
920
921
922 arguments = []
923 arguments.extend(["-p", "%d" % self.port])
924 if self.host: arguments.extend(["-n", self.host])
925 if utf8: arguments.append("--utf8")
926 if java: arguments.append("--java")
927 if cdp: arguments.append("--cdp")
928 arguments.extend(xargs.arguments)
929 arguments.extend(files)
930
931
932 ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
933
934
935 result = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
936 firstwarningline = ''
937 if result and result.exitStatus != 0:
938 with open(xargs.stderr) as f:
939 for l in f:
940 l = l.replace(self.parent.input+os.sep,'').strip()
941 if not l: continue
942 if not firstwarningline: firstwarningline = l
943 self.parent.log.warning(' %s'%l)
944 if result and result.exitStatus != 0 and not ignoreExitStatus:
945 self.parent.addOutcome(BLOCKED,
946 '%s failed: %s'%(result, firstwarningline) if firstwarningline else '%s returned non-zero exit code %d'%(result, result.exitStatus),
947 abortOnError=self.parent.defaultAbortOnError)
948
949 return result
950
951
953 """Create the Scenario Manager configuration file for the location of all blocks and functions.
954
955 """
956 impl = getDOMImplementation()
957 document = impl.createDocument(None, "config", None)
958 rootElement = document.documentElement
959
960
961 catalogsElement = document.createElement("catalogs")
962 rootElement.appendChild(catalogsElement)
963
964
965 blocksElement = document.createElement("blocks")
966
967 try:
968 try:
969 if self.parent.project.BLOCKS_DIR == "":
970 raise Exception("")
971 except Exception:
972 self.parent.project.BLOCKS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "blocks")
973 except AttributeError:
974 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT")
975
976 tup = [("blocks", self.parent.project.BLOCKS_DIR)]
977 if blocks: tup.extend(blocks)
978
979 for name, location in tup:
980 catalog = document.createElement("catalog")
981 nameAtttribute = document.createAttribute("name")
982 nameAtttribute.value=name
983 typeAtttribute = document.createAttribute("type")
984 typeAtttribute.value="FILE"
985 locationAtttribute = document.createAttribute("location")
986 locationAtttribute.value=location
987 catalog.setAttributeNode(nameAtttribute)
988 catalog.setAttributeNode(typeAtttribute)
989 catalog.setAttributeNode(locationAtttribute)
990 blocksElement.appendChild(catalog)
991 catalogsElement.appendChild(blocksElement)
992
993
994 functionsElement = document.createElement("functions")
995
996 try:
997 try:
998 if self.parent.project.FUNCTIONS_DIR == "":
999 raise Exception("")
1000 except Exception:
1001 self.parent.project.FUNCTIONS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "functions")
1002 except AttributeError:
1003 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT")
1004
1005 tup = [("functions", self.parent.project.FUNCTIONS_DIR)]
1006 if functions: tup.extend(functions)
1007
1008 for name, location in tup:
1009 catalog = document.createElement("catalog")
1010 nameAtttribute = document.createAttribute("name")
1011 nameAtttribute.value=name
1012 typeAtttribute = document.createAttribute("type")
1013 typeAtttribute.value="FILE"
1014 locationAtttribute = document.createAttribute("location")
1015 locationAtttribute.value=location
1016 catalog.setAttributeNode(nameAtttribute)
1017 catalog.setAttributeNode(typeAtttribute)
1018 catalog.setAttributeNode(locationAtttribute)
1019 functionsElement.appendChild(catalog)
1020 catalogsElement.appendChild(functionsElement)
1021
1022 fp = open(os.path.join(self.parent.output, "sm-config.xml"), "w")
1023 fp.write(document.toprettyxml(indent=" "))
1024 fp.close()
1025