1
2
3
4
5
6 import sys, os, string, logging, socket, copy, random, types
7
8 from pysys import log, ThreadFilter
9 from pysys.constants import *
10 from pysys.exceptions import *
11 from pysys.process.helper import ProcessWrapper
12
13 from apama.common import XArgsHolder
14 from apama.common import stringToUnicode
15
16 from xml.dom.minidom import getDOMImplementation
17
18
19 for variable in ("APAMA_COMMON_JRE", "APAMA_LIBRARY_VERSION"):
20 try:
21 if getattr(PROJECT, variable) == "":
22 raise Exception("Project variable %s not set within the environment" % variable)
23 except AttributeError:
24 raise Exception("Project variable %s undefined in .pysysproject" % variable)
25
26 try:
27 try:
28 if PROJECT.APAMA_BIN_DIR == "":
29 raise Exception("")
30 except Exception:
31 PROJECT.APAMA_BIN_DIR = os.path.join(PROJECT.APAMA_HOME, 'bin')
32 except AttributeError:
33 raise Exception("Not running inside an Apama environment. APAMA_HOME and APAMA_WORK not set")
34
36 """Helper class for the Software AG Apama Event Correlator.
37
38 The Correlator Helper class has been designed for use as an extension module to the PySys System Test
39 Framework, offering the ability to configure, start and interact with an Event Correlator. The usage
40 pattern of the class is to create an instance per Correlator, and to then make method calls onto the
41 instance to perform operations such as the injection of monitorscript or java JMON applications, the
42 sending of events, deletion of named objects etc. For example::
43
44 correlator = CorrelatorHelper(self)
45 correlator.start(logfile="correlator.log")
46 correlator.inject(filenames=["simple.mon"])
47
48 Process related methods of the class declare a method signature which includes named parameters for the
49 most frequently used options to the method. They also declare the **xargs parameter to allow passing in
50 of additional supported arguments to the process. The additional arguments that are currently supported
51 via **xargs are::
52
53 workingDir: The default value for the working directory of a process
54 state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND)
55 timeout: The default value of the process timeout
56 stdout: The default value of the process stdout
57 stderr: The default value of the process stderr
58 arguments: List of extra arguments to be passed to the process
59
60 This means that legitimate calls to the start method include::
61
62 correlator.start(logfile="correlator.log")
63 correlator.start(logfile="correlator.log", stdout="correlator1.out")
64 correlator.start(state=FOREGROUND, timeout=5)
65
66
67 @ivar parent: Reference to the PySys testcase instantiating this class instance
68 @type parent: pysys.basetest
69 @ivar port: Port used for starting and interaction with the Correlator
70 @type port: integer
71 @ivar host: Hostname for interaction with a remote Correlator
72 @type host: string
73 @ivar environ: The environment for running the Correlator
74 @type environ: dictionary
75
76 """
77
78 - def __init__(self, parent, port=None, host=None):
79 """Create an instance of the CorrelatorHelper class.
80
81 If no port parameter is used in the argument list an available port will be dynamically found from
82 the OS and used for starting the Correlator, and performing all operations against it. The host
83 parameter is only used to perform operations against a remote Correlator started external to the
84 PySys framework - the class does not support the starting of a Correlator remote to the localhost.
85
86 @param parent: Reference to the parent PySys testcase
87 @param port: The port used for starting and interacting with the Correlator
88 @param host: The hostname used for interaction with a remote Correlator
89
90
91 """
92 self.parent = parent
93 self.port = port
94 self.host = host
95 if self.port == None: self.port = parent.getNextAvailableTCPPort()
96 if self.host == None: self.host = "localhost"
97 self.environ = {}
98 for key in os.environ: self.environ[stringToUnicode(key)] = stringToUnicode(os.environ[key])
99
100
102 """Add the supplied path to the CLASSPATH environment variable for starting this instance.
103
104 """
105 if self.environ.has_key("CLASSPATH"):
106 self.environ["CLASSPATH"] = r"%s%s%s" % (self.environ["CLASSPATH"], ENVSEPERATOR, os.path.normpath(path))
107 else:
108 self.environ["CLASSPATH"] = os.path.normpath(path)
109
110
112 """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance.
113
114 """
115 if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH"
116 else: key = "PATH"
117
118 if self.environ.has_key(key):
119 self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path))
120 else:
121 self.environ[key] = os.path.normpath(path)
122
123
124 - def start(self, logfile=None, verbosity=None, java=None, Xclock=None, environ=None, **xargs):
125 """Start the Correlator.
126
127 @param logfile: Name of the Correlator log file
128 @param verbosity: The verbosity level of the Correlator logging
129 @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications
130 @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode
131 @param xargs: Variable argument list for the additional supported process parameters
132
133 """
134
135 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'correlator')
136 displayName = "correlator"
137
138
139 instances = self.parent.getInstanceCount(displayName)
140 dstdout = os.path.join(self.parent.output, 'correlator.out')
141 dstderr = os.path.join(self.parent.output, 'correlator.err')
142 if instances: dstdout = "%s.%d" % (dstdout, instances)
143 if instances: dstderr = "%s.%d" % (dstderr, instances)
144
145
146 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr)
147
148
149 licence = os.path.join(PROJECT.APAMA_WORK, 'license', 'license.txt')
150 if not os.path.exists(licence):
151
152
153 licence = None
154
155
156 arguments = []
157 arguments.extend(["-p", "%d" % self.port])
158 if licence is not None: arguments.extend(["-l", "%s" % licence])
159 if logfile: arguments.extend(["-f", logfile])
160 if verbosity: arguments.extend(["-v", verbosity])
161 if java: arguments.append("--java")
162 if Xclock: arguments.append("-Xclock")
163 if xargs.arguments: arguments.extend(xargs.arguments)
164 if environ is not None:
165 for key in environ:
166 env[stringToUnicode(key)] = stringToUnicode(self.environ[key])
167
168
169 env = {}
170 for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key])
171 hprocess = self.parent.startProcess(command, arguments, env, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
172 self.waitForCorrelatorUp()
173 return hprocess
174
175
176 - def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=None, utf8=False, **xargs):
177 """Attach a receiver to the Correlator.
178
179 @param filename: The basename of the file to write events received from the Correlator to
180 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
181 @param channels: List of channel names to subscribe to
182 @param suppressBatch: Do not include BATCH timestamps in the output
183 @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived
184 @param utf8: Write output in UTF8
185 @param xargs: Variable argument list for the additional supported process parameters
186
187 """
188
189 command = os.path.join(PROJECT.APAMA_BIN_DIR,'engine_receive')
190 displayName = "engine_receive"
191
192
193 instances = self.parent.getInstanceCount(displayName)
194 dstdout = os.path.join(self.parent.output, 'receive.out')
195 dstderr = os.path.join(self.parent.output, 'receive.err')
196 if instances: dstdout = "%s.%d" % (dstdout, instances)
197 if instances: dstderr = "%s.%d" % (dstderr, instances)
198
199
200 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr)
201
202
203 if not filedir: filedir = self.parent.output
204 file = os.path.join(filedir, filename)
205
206
207 arguments = []
208 arguments.extend(["-p", "%d" % self.port])
209 if self.host: arguments.extend(["-n", self.host])
210 if filename: arguments.extend(["-f", file])
211 if suppressBatch: arguments.append("--suppressbatch")
212 if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch")
213 if utf8: arguments.append("--utf8")
214 if xargs.arguments: arguments.extend(xargs.arguments)
215 for channel in channels:
216 arguments.extend(['--channel', channel])
217
218
219 proc = self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
220 self.parent.waitForFile(filename, filedir = filedir)
221 return proc
222
223
224 - def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
225 """Obtain runtime operational status from the Correlator.
226
227 @param filename: The basename of the file to write the runtime operational status to
228 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
229 @param raw: Obtain csv format data when logging to file
230 @param interval: The polling interval (seconds) between logging to file
231 @param xargs: Variable argument list for the additional supported process parameters
232
233 Note that when outputing data in the raw (csv) format, the column identifiers and their positions
234 are defined by::
235
236 Uptime = 0
237 Number of monitors = 1
238 Number of mthreads = 2
239 Number of java applications = 3
240 Number of listeners = 4
241 Number of sub-listeners = 5
242 Number of event types = 6
243 Number of events on input queue = 7
244 Number of events received = 8
245 Number of events on the route queue = 9
246 Number of events routed = 10
247 Number of attached consumers = 11
248 Number of events on output queue = 12
249 Number of output events created = 13
250 Number of output events sent = 14
251 Number of events processed = 15
252
253 """
254
255 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_watch')
256 displayName = "engine_watch"
257
258
259 instances = self.parent.getInstanceCount(displayName)
260 dstdout = os.path.join(self.parent.output, 'watch.out')
261 dstderr = os.path.join(self.parent.output, 'watch.err')
262 if instances: dstdout = "%s.%d" % (dstdout, instances)
263 if instances: dstderr = "%s.%d" % (dstderr, instances)
264
265
266 xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr)
267
268
269 if not filedir: filedir = self.parent.output
270 file = os.path.join(filedir, filename)
271
272
273 arguments = []
274 arguments.extend(["-p", "%d" % self.port])
275 if self.host: arguments.extend(["-n", self.host])
276 if filename: arguments.extend(["-f", file])
277 if raw: arguments.append("--raw")
278 if interval: arguments.extend(["-i", "%d" % interval])
279 if xargs.arguments: arguments.extend(xargs.arguments)
280
281
282 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
283
284
286 """Inject MonitorScript into the Correlator.
287
288 @param filenames: List of the basename of monitorscript files to inject into the Correlator
289 @param filedir: Directory containing the input monitorscript files (defaults to testcase input directory)
290 @param utf8: Assume input is in UTF8
291 @param xargs: Variable argument list for the additional supported process parameters
292
293 """
294 return self.__inject(filenames, filedir, utf8, False, **xargs)
295
296
297 - def injectJMON(self, filename, filedir=None, **xargs):
298 """Inject a JMON java application into the Correlator.
299
300 @param filename: The basename of the jar file to inject into the Correlator
301 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
302 @param xargs: Variable argument list for the additional supported process parameters
303
304 """
305 return self.__inject([filename], filedir, False, True, **xargs)
306
307
308 - def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs):
309 """Inject a Scenario into the Correlator.
310
311 @param filename: The basename of the scenario definition file to inject into the Correlator
312 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
313 @param debug: Generate debug monitorscript
314 @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location
315 @param functions: Sequence of tuples, where each tuple contains a function catalog name and location
316 @param xargs: Variable argument list for the additional supported process parameters
317
318 """
319
320 command = os.path.join(PROJECT.APAMA_COMMON_JRE, "bin", "java")
321 jarbase = "event_modeler"
322 displayName = "%s.jar" % jarbase
323
324
325 instances = self.parent.getInstanceCount(displayName)
326 dstdout = os.path.join(self.parent.output, "%s.out" % jarbase)
327 dstderr = os.path.join(self.parent.output, "%s.err" % jarbase)
328 if instances: dstdout = "%s.%d" % (dstdout, instances)
329 if instances: dstderr = "%s.%d" % (dstderr, instances)
330
331
332 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr)
333
334
335 self.__createScenarioManagerConfig(blocks, functions)
336
337
338 root, ext = os.path.splitext(filename)
339 monitorName = os.path.join(self.parent.output, root + ".mon")
340 if not filedir: filedir = self.parent.input
341 sdfName = os.path.join(filedir, filename)
342
343
344 arguments = []
345 arguments.append("-Djava.awt.headless=true")
346 arguments.append("-DAPAMA_LOG_IMPL=simple")
347 arguments.append("-DAPAMA_LOG_LEVEL=DEBUG")
348
349 arguments.extend(["-jar", os.path.join(PROJECT.APAMA_HOME, 'lib', '%s.jar' % jarbase)])
350 arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")])
351 if not debug: arguments.extend(["-XgenerateDebug", "false"])
352 arguments.extend(["-Xgenerate", sdfName, monitorName])
353
354
355 status = self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
356
357
358 if status: self.__inject([monitorName])
359
360
361 - def send(self, filenames=[], filedir=None, loop=None, utf8=False, doNoBatch=False, **xargs):
362 """Send events into the Correlator.
363
364 @param filenames: List of the basename of event files to send into the Correlator
365 @param filedir: Directory containing the input event files (defaults to testcase input directory)
366 @param loop: Number of times to loop through the input file
367 @param utf8: Assume input is in UTF8
368 @param doNoBatch: Do not automatically batch events
369 @param xargs: Variable argument list for the additional supported process parameters
370
371 """
372
373 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_send')
374 displayName = "engine_send"
375
376
377 instances = self.parent.getInstanceCount(displayName)
378 dstdout = os.path.join(self.parent.output, 'send.out')
379 dstderr = os.path.join(self.parent.output, 'send.err')
380 if instances: dstdout = "%s.%d" % (dstdout, instances)
381 if instances: dstderr = "%s.%d" % (dstderr, instances)
382
383
384 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr)
385
386
387 files=[]
388 if not filedir: filedir = self.parent.input
389 if type(filenames) is types.ListType:
390 for file in filenames: files.append(os.path.join(filedir, file))
391 elif type(filenames) is types.StringType:
392 files.append(os.path.join(filedir, filenames))
393 else:
394 self.parent.log.error("Input parameter for filenames is not a string or list type")
395
396
397 arguments = []
398 arguments.extend(["-p", "%d" % self.port])
399 if self.host: arguments.extend(["-n", self.host])
400 if loop: arguments.extend(["--loop", "%s" % loop])
401 if utf8: arguments.append("--utf8")
402 if doNoBatch: arguments.append("--doNoBatch")
403 if xargs.arguments: arguments.extend(xargs.arguments)
404 arguments.extend(files)
405
406
407 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
408
409
410 - def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
411 """Delete named objects from the Event Crrelator.
412
413 @param names: List of names to delete from the Correlator
414 @param filename: The basename of a file containing a set of names to delete
415 @param filedir: The directory containing filename (defaults to testcase input subdirectory)
416 @param force: Force deletion of names even if they are in use
417 @param kill: Kill name even if it is a running monitor
418 @param all: Delete everything in the Correlator
419 @param utf8: Assume input is in UTF8
420 @param xargs: Variable argument list for the additional supported process parameters
421
422 """
423
424 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_delete')
425 displayName = "engine_delete"
426
427
428 instances = self.parent.getInstanceCount(displayName)
429 dstdout = os.path.join(self.parent.output, 'delete.out')
430 dstderr = os.path.join(self.parent.output, 'delete.err')
431 if instances: dstdout = "%s.%d" % (dstdout, instances)
432 if instances: dstderr = "%s.%d" % (dstderr, instances)
433
434
435 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr)
436
437
438 if not filedir and filename:
439 filedir = self.parent.input
440 filename = os.path.join(filedir, filename)
441
442
443 arguments = []
444 arguments.extend(["-p", "%d" % self.port])
445 if self.host: arguments.extend(["-n", self.host])
446 if filename: arguments.extend(["-f", filename])
447 if force: arguments.append("--force")
448 if kill: arguments.append("--kill")
449 if all: arguments.extend(["--all","--yes"])
450 if utf8: arguments.append("--utf8")
451 if xargs.arguments: arguments.extend(xargs.arguments)
452 if names: arguments.extend(names)
453
454
455 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
456
457
459 """Execute management operations against the Correlator.
460
461 @param xargs: Variable argument list for the additional supported process parameters
462
463 """
464
465 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_management')
466 displayName = "engine_management"
467
468
469 instances = self.parent.getInstanceCount(displayName)
470 dstdout = os.path.join(self.parent.output, 'manage.out')
471 dstderr = os.path.join(self.parent.output, 'manage.err')
472 if instances: dstdout = "%s.%d" % (dstdout, instances)
473 if instances: dstderr = "%s.%d" % (dstderr, instances)
474
475
476 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr)
477
478
479 arguments = []
480 arguments.extend(["-p", "%d" % self.port])
481 if self.host: arguments.extend(["-n", self.host])
482 if xargs.arguments: arguments.extend(xargs.arguments)
483
484
485 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
486
487
488 - def connect(self, source, channel=None, **xargs):
489 """Connect a Correlator to this instance as a source.
490
491 @param source: An instance of the L{CorrelatorHelper} class to act as the source
492 @param channel: The channel to make the connection on
493 @param xargs: Variable argument list for the additional supported process parameters
494
495 """
496
497 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_connect')
498 displayName = "engine_connect"
499
500
501 instances = self.parent.getInstanceCount(displayName)
502 dstdout = os.path.join(self.parent.output, 'connect.out')
503 dstderr = os.path.join(self.parent.output, 'connect.err')
504 if instances: dstdout = "%s.%d" % (dstdout, instances)
505 if instances: dstderr = "%s.%d" % (dstderr, instances)
506
507
508 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr)
509
510
511 arguments = []
512 arguments.extend(["-sn", source.host])
513 arguments.extend(["-sp", "%d" % source.port])
514 arguments.extend(["-tn", self.host])
515 arguments.extend(["-tp", "%d" % self.port])
516 if channel: arguments.extend(["-c", channel])
517 if xargs.arguments: arguments.extend(xargs.arguments)
518
519
520 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
521
522
523 - def disconnect(self, source, channel=None, **xargs):
524 """Disconnect a correlator to this instance as a source correlator.
525
526 @param source: An instance of the L{CorrelatorHelper} class acting as the source
527 @param channel: The channel to be disconnected
528 @param xargs: Variable argument list for the additional supported process parameters
529
530 """
531 if xargs.has_key('arguments'):
532 if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0:
533 xargs['arguments'].append('-x')
534 else:
535 xargs['arguments'] = ['-x']
536 self.connect(source, channel, **xargs)
537
538
540 """Enable and disable application event logging.
541
542 Provides a wrapper around the engine_management command line tool to enable and disable
543 application event logging. Once enabled, application event logging will log to the correlator
544 log file information specific processing occurrences, e.g. the receipt of events for
545 processing, the triggering of listeners, execution of the garbage collector etc.
546
547 @param enable: Set to True to enable, set to False to disable event logging
548
549 """
550 if enable:
551 self.manage(arguments=['-r', 'applicationEventLogging on'])
552 else:
553 self.manage(arguments=['-r', 'applicationEventLogging off'])
554
555
557 """Set the application log file name.
558
559 On setting the application log file details, the output of all native log commands within
560 MonitorScript will be logged to the designated log file. This allows seperation between the
561 log statements written by the Event Correlator i.e. for status, errors etc, and those
562 generated by the actual application.
563
564 @param filename: The basename of the file to write the application log file to
565 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
566
567 """
568 if not filedir: filedir = self.parent.output
569 self.manage(arguments=['-r', 'setApplicationLogFile %s' % os.path.join(filedir, filename)])
570
571
573 """Set the application log level.
574
575 @param verbosity: The verbosity level of the application logging
576
577 """
578 self.manage(arguments=['-r', 'setApplicationLogLevel %s' % verbosity])
579
580
582 """Inform the Event Correlator to start collecting profiling statistics. """
583 self.manage(arguments=['-r', 'profiling on'])
584
585
587 """Inform the Event Correlator to stop collecting profiling statistics. """
588 self.manage(arguments=['-r', 'profiling off'])
589
590
592 """Inform the Event Correlator to reset it's collection of profiling statistics. """
593 self.manage(arguments=['-r', 'profiling reset'])
594
595
597 """Obtain the latest profiling statistics from the Event Correlator.
598
599 @param filename: The basename of the file to write the profiling statistics to
600 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
601
602 """
603 if not filedir: filedir = self.parent.output
604 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','profiling get'])
605
606
608 """Obtain a stringified representation of the current application state from the Event Correlator.
609
610 @param filename: The basename of the file to write the dump of application state to
611 @param filedir: The directory to write filename to (defaults to testcase output subdirectory)
612
613 """
614 if not filedir: filedir = self.parent.output
615 self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'])
616
617
619 """Block until the Correlator declares itself to be ready for processing.
620
621 """
622
623 command = os.path.join(PROJECT.APAMA_BIN_DIR, 'engine_management')
624
625
626 stdout = os.path.join(self.parent.output, 'engine_management.out')
627 stderr = os.path.join(self.parent.output, 'engine_management.err')
628
629
630 arguments = []
631 arguments.append("-v")
632 arguments.append("-w")
633 arguments.extend(["-p", "%d" % self.port])
634 arguments.extend(["-n", self.host])
635
636 self.parent.log.info("Waiting for correlator to come up ...")
637 try:
638 process = ProcessWrapper(command, arguments, self.environ, self.parent.output, FOREGROUND, timeout, stdout, stderr)
639 process.start()
640 except ProcessError, e:
641 raise Exception("Unable to start engine_management to check for correlator up: %s"%e)
642 except ProcessTimeout:
643 process.stop()
644 raise Exception("Timed out after %d seconds waiting for correlator to come up"%timeout)
645
646
647 - def flush(self, timeout=60, count=1, **xargs):
648 """Make sure all events have been flushed through the correlator.
649
650 Currently implemented by using the flushAllQueues management request.
651 Will initate a cycle where each queue in the correlator is drained,
652 optionally repeated count times. This is useful when you have a
653 multi-context application.
654
655 @param timeout: The amount of time to wait
656 @param count: The number of times to ensure queues are flushed
657
658 @param xargs: Variable argument list for the additional supported process parameters
659
660 """
661 flushAllQueues = 'flushAllQueues'
662 if count > 1:
663 flushAllQueues = 'flushAllQueues %d' % count
664 self.manage(arguments=['-r', flushAllQueues], timeout=timeout)
665
666
667 - def __inject(self, filenames=[], filedir=None, utf8=False, java=False, **xargs):
668 """Inject an application into the correlator.
669
670 """
671
672 command = os.path.join(PROJECT.APAMA_BIN_DIR,'engine_inject')
673 displayName = "engine_inject"
674
675
676 instances = self.parent.getInstanceCount(displayName)
677 dstdout = os.path.join(self.parent.output, 'inject.out')
678 dstderr = os.path.join(self.parent.output, 'inject.err')
679 if instances: dstdout = "%s.%d" % (dstdout, instances)
680 if instances: dstderr = "%s.%d" % (dstderr, instances)
681
682
683 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr)
684
685
686 files=[]
687 if not filedir: filedir = self.parent.input
688 if type(filenames) is types.ListType:
689 for file in filenames: files.append(os.path.join(filedir, file))
690 elif type(filenames) is types.StringType:
691 files.append(os.path.join(filedir, filenames))
692 else:
693 self.parent.log.error("Input parameter for filenames is not a string or list type")
694
695
696 arguments = []
697 arguments.extend(["-p", "%d" % self.port])
698 if self.host: arguments.extend(["-n", self.host])
699 if utf8: arguments.append("--utf8")
700 if java: arguments.append("--java")
701 if xargs.arguments: arguments.extend(xargs.arguments)
702 arguments.extend(files)
703
704
705 return self.parent.startProcess(command, arguments, self.environ, xargs.workingDir, xargs.state, xargs.timeout, xargs.stdout, xargs.stderr, displayName)
706
707
709 """Create the Scenario Manager configuration file for the location of all blocks and functions.
710
711 """
712 impl = getDOMImplementation()
713 document = impl.createDocument(None, "config", None)
714 rootElement = document.documentElement
715
716
717 catalogsElement = document.createElement("catalogs")
718 rootElement.appendChild(catalogsElement)
719
720
721 blocksElement = document.createElement("blocks")
722
723 try:
724 try:
725 if PROJECT.BLOCKS_DIR == "":
726 raise Exception("")
727 except Exception:
728 PROJECT.BLOCKS_DIR = os.path.join(PROJECT.APAMA_HOME, "catalogs", "blocks")
729 except AttributeError:
730 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT")
731
732 tup = [("blocks", PROJECT.BLOCKS_DIR)]
733 if blocks: tup.extend(blocks)
734
735 for name, location in tup:
736 catalog = document.createElement("catalog")
737 nameAtttribute = document.createAttribute("name")
738 nameAtttribute.value=name
739 typeAtttribute = document.createAttribute("type")
740 typeAtttribute.value="FILE"
741 locationAtttribute = document.createAttribute("location")
742 locationAtttribute.value=location
743 catalog.setAttributeNode(nameAtttribute)
744 catalog.setAttributeNode(typeAtttribute)
745 catalog.setAttributeNode(locationAtttribute)
746 blocksElement.appendChild(catalog)
747 catalogsElement.appendChild(blocksElement)
748
749
750 functionsElement = document.createElement("functions")
751
752 try:
753 try:
754 if PROJECT.FUNCTIONS_DIR == "":
755 raise Exception("")
756 except Exception:
757 PROJECT.FUNCTIONS_DIR = os.path.join(PROJECT.APAMA_HOME, "catalogs", "functions")
758 except AttributeError:
759 raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT")
760
761 tup = [("functions", PROJECT.FUNCTIONS_DIR)]
762 if functions: tup.extend(functions)
763
764 for name, location in tup:
765 catalog = document.createElement("catalog")
766 nameAtttribute = document.createAttribute("name")
767 nameAtttribute.value=name
768 typeAtttribute = document.createAttribute("type")
769 typeAtttribute.value="FILE"
770 locationAtttribute = document.createAttribute("location")
771 locationAtttribute.value=location
772 catalog.setAttributeNode(nameAtttribute)
773 catalog.setAttributeNode(typeAtttribute)
774 catalog.setAttributeNode(locationAtttribute)
775 functionsElement.appendChild(catalog)
776 catalogsElement.appendChild(functionsElement)
777
778 fp = open(os.path.join(self.parent.output, "sm-config.xml"), "w")
779 fp.write(document.toprettyxml(indent=" "))
780 fp.close()
781