Source code for apama.correlator

#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights
# Copyright (c) 2013-2018 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains PySys extensions for starting and interacting with correlator processes. 
"""
from __future__ import print_function
import sys, os, string, logging, socket, copy, random, types

from pysys import log, ThreadFilter
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.pycompat import *

from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
from apama.common import XArgsHolder
from apama.common import stringToUnicode

from xml.dom.minidom import getDOMImplementation


[docs]class CorrelatorHelper(ApamaServerProcess): """Helper class for the Software AG Apama Event Correlator. The Correlator Helper class has been designed for use as an extension module to the PySys System Test Framework, offering the ability to configure, start and interact with an Event Correlator. The usage pattern of the class is to create an instance per Correlator, and to then make method calls onto the instance to perform operations such as the injection of EPL or java JMON applications, the sending of events, deletion of named objects etc. For example:: correlator = CorrelatorHelper(self, name='mycorrelator') correlator.start(logfile="mycorrelator.log") correlator.inject(filenames=["simple.mon"]) Process related methods of the class declare a method signature which includes named parameters for the most frequently used options to the method. They also declare the **xargs parameter to allow passing in of additional supported arguments to the process. The additional arguments that are currently supported via **xargs are:: workingDir: The default value for the working directory of a process state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND) timeout: The default value of the process timeout stdout: The default value of the process stdout stderr: The default value of the process stderr arguments: List of extra arguments to be passed to the process This means that legitimate calls to the start method include:: correlator.start(logfile="correlator.log") correlator.start(logfile="correlator.log", stdout="correlator1.out") correlator.start(state=FOREGROUND, timeout=5) @ivar parent: Reference to the PySys testcase instantiating this class instance @type parent: pysys.basetest @ivar port: Port used for starting and interaction with the Correlator @type port: integer @ivar host: Hostname for interaction with a remote Correlator @type host: string @ivar environ: The environment for running the Correlator @type environ: dictionary """ """ Holds the Java classpath used when starting a correlator with JVM.""" classpath = None licence = None
[docs] def __init__(self, parent, port=None, host=None, name='correlator'): """Create an instance of the CorrelatorHelper class. If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the Correlator, and performing all operations against it. The host parameter is only used to perform operations against a remote Correlator started external to the PySys framework - the class does not support the starting of a Correlator remote to the localhost. @param parent: Reference to the parent PySys testcase @param port: The port used for starting and interacting with the Correlator @param host: The hostname used for interaction with a remote Correlator @param name: A display name for this process (default is "correlator"), also used for the default stdout/err filenames. """ ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host) self.injectJMON = self.injectJava # old alias for this method """ Alias for injectJava @see: L{injectJava} """ self.injectMon = self.injectEPL # shorter alias for injecting a .mon file """ Alias for injectEPL @see: L{injectEPL} """ self.injectMonitorscript = self.injectEPL # Backwards compatibility """ Alias for injectEPL @see: L{injectEPL} """ self.licence = os.path.join(self.parent.project.APAMA_WORK, 'license', 'ApamaServerLicense.xml') if not os.path.exists(self.licence): # Don't put licence on the command line if it doesn't exist - run # in evaluation mode, or find it automatically self.licence = None """ Set the licence file location. """
[docs] def addToClassPath(self, path): """Add the supplied path to the Java classpath variable for starting this instance. """ if self.classpath == None: self.classpath = os.path.normpath(path) else: self.classpath = r"%s%s%s" % (self.classpath, ENVSEPERATOR, os.path.normpath(path))
[docs] def addToPath(self, path): """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this instance. """ if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" else: key = "PATH" if key in self.environ: self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path)) else: self.environ[key] = os.path.normpath(path)
[docs] def start(self, logfile=None, verbosity=None, java=False, Xclock=None, environ=None, inputLog=None, waitForServerUp=True, config = None, **xargs): """Start the Correlator. @param logfile: Name of the Correlator log file (if used, set this to something similar to the display "name" passed to the constructor) @param verbosity: The verbosity level of the Correlator logging @param java: If pysys.constants.False then the Correlator will be started with support for JMON applications @param Xclock: If pysys.constants.True then the Correlator will be started in externally clocked mode @param environ: Map of environment variables to override. @param inputLog: Relative or absolute path of file to write input log to, containing all events, EPL and other inputs sent to the correlator. The format of the input log may change without notice so should not be replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the input log can also be used to pass information to customer support in the event of a problem. @param waitForServerUp: Set to False to disable automatically waiting until the component is ready @param config: path or list of paths to a initialization or connectivity configuration file or directory containing them @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'correlator') dstdout, dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name) # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project) # set the arguments to the process arguments = [] arguments.extend(["--name", self.name, "-p", "%d" % self.port]) if self.licence is not None: arguments.extend(["-l", "%s" % self.licence]) if logfile: arguments.extend(["-f", logfile]) if verbosity: arguments.extend(["-v", verbosity]) if java: arguments.append("--java") if inputLog: arguments.extend(["--inputLog", os.path.join(self.parent.output, inputLog)]) if Xclock: arguments.append("-Xclock") if config != None: if isstring(config): arguments.extend(['--config', config]) elif type(config) is list: for param in config: arguments.extend(['--config', param]) else: raise Exception("Input parameter for config is not a string or list type") if java and self.classpath: arguments.extend(["--javaopt", "-Djava.class.path=%s" % self.classpath]) arguments.extend(xargs.arguments) # start the process - unicoding environment needs moving into the PySys framework env = {} if getattr(self.parent, 'eplcoverage','').lower()=='true': env['AP_EPL_COVERAGE_FILE'] = 'correlator.${PID}.eplcoverage' for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) if environ: for key in environ: env[stringToUnicode(key)] = stringToUnicode(environ[key]) if self.process and self.process.running(): raise Exception('Cannot start component as an instance is already running: %s'%self) self.process = None try: hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs) self.process = hprocess if waitForServerUp: self.waitForComponentUp(timeout=xargs.timeout) except Exception: for f in [logfile, xargs.stdout, xargs.stderr]: if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*', 'Invalid Correlator argument.*'], tail=True): break raise return hprocess
[docs] def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False, logChannels=False, **xargs): """Attach a receiver to the Correlator. Returns the process for the receiver. @param filename: The basename of the file to write events received from the Correlator to @param filedir: The directory to write filename to (defaults to testcase output subdirectory) @param channels: List of channel names to subscribe to @param logChannels: Print the channel each event came from in the output @param suppressBatch: Do not include BATCH timestamps in the output @param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived @param utf8: Write output in UTF8 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_receive') displayName = 'engine_receive <%s> [%s]'%(self.name, os.path.basename(filename)) # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'receive') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set location of output file (defaults to output subdirectory) if not filedir: filedir = self.parent.output file = os.path.join(filedir, filename) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if filename: arguments.extend(["-f", file]) if suppressBatch: arguments.append("--suppressbatch") if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch") if utf8: arguments.append("--utf8") if logChannels: arguments.append("--logChannels") arguments.extend(xargs.arguments) for channel in channels: arguments.extend(['--channel', channel]) # start the process proc = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs) self.parent.waitForFile(filename, filedir = filedir) return proc
[docs] def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs): """Obtain runtime operational statistics from the Correlator. By default this runs as a BACKGROUND process. The process is returned by the method call. @param filename: The basename of the file to write the runtime operational status to @param filedir: The directory to write filename to (defaults to testcase output subdirectory) @param raw: Obtain csv format data when logging to file @param interval: The polling interval (seconds) between logging to file @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir @note: When outputing data in the raw (csv) format, the column identifiers and their positions are defined by: - Uptime = 0 - Number of monitors = 1 - Number of mthreads = 2 - Number of java applications = 3 - Number of listeners = 4 - Number of sub-listeners = 5 - Number of event types = 6 - Number of events on input queue = 7 - Number of events received = 8 - Number of events on the route queue = 9 - Number of events routed = 10 - Number of attached consumers = 11 - Number of events on output queue = 12 - Number of output events created = 13 - Number of output events sent = 14 - Number of events processed = 15 """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_watch') # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'watch') displayName = "engine_watch <%s> -> %s"%(self.name, os.path.basename(filename or dstdout)) # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set location of output file (defaults to output subdirectory) if not filedir: filedir = self.parent.output # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if filename: arguments.extend(["-f", os.path.join(filedir, filename)]) if raw: arguments.append("--raw") if interval: arguments.extend(["-i", "%d" % interval]) arguments.extend(xargs.arguments) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def inspect(self, filename='inspect.txt', filedir=None, raw=False, **xargs): """Obtain information about what application(s) have been injected into the Correlator and what listeners are in existence. This runs as a FOREGROUND process. @param filename: The basename of the file to write the information to, e.g. inspect.txt @param filedir: The directory to write filename to (defaults to testcase output subdirectory) @param raw: Use parser-friendly output format @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ assert filename # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_inspect') displayName = "engine_inspect <%s> -> %s"%(self.name, os.path.basename(filename)) # set the default stdout and stderr dstdout,dstderr = os.path.join(self.parent.output, filename), os.path.join(self.parent.output, filename.replace('.txt','')+'.err') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, state=FOREGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set location of output file (defaults to output subdirectory) if not filedir: filedir = self.parent.output file = os.path.join(filedir, filename) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if raw: arguments.append("--raw") arguments.extend(xargs.arguments) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def initialize(self, path, correlatorName=None, properties=None, include=None, exclude=None, **xargs): """Initialize the Correlator by injecting all the files making up the project, typically based on a Designer launch configuration .deploy file. This is usually the simplest way to inject all the files from an application into the correlator. Alternative approaches are to call the L{injectEPL} and related methods individually for each file, or to specify the files in the "initialization" section of a yaml file passed into the correlator L{start} call using the "config" argument. Queries and Digital Event Services .mon files will be generated automatically as part of injection, but any Java jar files must be compiled manually before invoking this method. @param path: Path of a .deploy file from Designer (recommended), a directory, or a text file listing the files to be injected. Must be an absolute path, or relative to the testcase output dir. @param correlatorName: The name of the Correlator as specified in the launch configuration .deploy file, e.g "defaultCorrelator". If not specified, the name of this pysys correlator will be used. @param properties: Optional path to a .properties file specifying ${var} placeholders to be used for resolving the paths of any files outside the project directory. Absolute path or relative to output dir. @param include: a comma-separated string specifying which of the project files found by the tool should be injected, e.g. "**/foo/Bar*.evt,**.mon". If not specified, all files will be included (unless specifically excluded) @param exclude: a comma-separated string specifying which of the project files found by the tool should NOT be injected, e.g. "**/foo/Bar*.evt". @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'initialize-%s'%self.name) # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set the input and output names for qry and mon path = os.path.join(self.parent.output, path) # set arguments to the process arguments = [] arguments.append("-Djava.awt.headless=true") arguments.append("-DAPAMA_LOG_IMPL=log4j") arguments.append("-Dlog4j.configuration=file:///%s/etc/engine-deploy-java-log4j.properties"%self.parent.project.APAMA_HOME.replace('\\','/')) arguments.append("-Djava.io.tmpdir=%s"%self.parent.output) arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', 'ap-generate-project-init-list.jar')]) arguments.append(path) if properties: assert isstring(properties) arguments.append(os.path.join(self.parent.output, properties)) if correlatorName or ('!' not in path): arguments.extend(['--correlatorName', correlatorName or self.name]) arguments.extend(['--inject', self.host, str(self.port)]) if include: assert isstring(include) arguments.extend(['--include', include]) if exclude: assert isstring(exclude) arguments.extend(['--exclude', exclude]) arguments.extend(xargs.arguments) # start the process to generate the EPL try: status = self.parent.startProcess(command, arguments, self.environ, displayName='initialize <%s> [%s]'%(self.name, os.path.basename(path)), **xargs.kwargs) finally: self.parent.logFileContents(xargs.stderr, maxLines=50) self.parent.logFileContents(xargs.stdout, maxLines=50) # in case there are queries involved return status
[docs] def injectEPL(self, filenames=[], filedir=None, utf8=False, **xargs): """Inject EPL *.mon files into the Correlator. See also L{initialize}. @param filenames: List of the basename of EPL files to inject into the Correlator @param filedir: Directory containing the input EPL files (defaults to testcase input directory) @param utf8: Assume input is in UTF8 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ return self.__inject(filenames, filedir, utf8, False, False, **xargs)
[docs] def injectJava(self, filename, filedir=None, **xargs): """Inject a Java plug-in or application into the Correlator. See also L{initialize}. @param filename: The basename of the jar file to inject into the Correlator @param filedir: The directory containing filename (defaults to testcase input subdirectory) @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ return self.__inject([filename], filedir, False, True, False, **xargs)
[docs] def injectCDP(self, filenames=[], filedir=None, **xargs): """Inject Correlator deployment package into the Correlator. See also L{initialize}. @param filenames: List of the basename of cdp files to inject into the Correlator @param filedir: Directory containing the input cdp files (defaults to testcase input directory) @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ return self.__inject(filenames, filedir, False, False, True, **xargs)
[docs] def injectScenario(self, filename, filedir=None, debug=False, blocks=None, functions=None, **xargs): """Inject a Scenario into the Correlator. See also L{initialize}. @param filename: The basename of the scenario definition file to inject into the Correlator @param filedir: The directory containing filename (defaults to testcase input subdirectory) @param debug: Generate debug EPL @param blocks: Sequence of tuples, where each tuple contains a block catalog name and location @param functions: Sequence of tuples, where each tuple contains a function catalog name and location @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") jarbase = "ap-modeler" # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase) # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # creae the scenario manager configuration file self.__createScenarioManagerConfig(blocks, functions) # set the input and output names for sdf and mon monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon") if not filedir: filedir = self.parent.input sdfName = os.path.join(filedir, filename) # set arguments to the process arguments = [] arguments.append("-Djava.awt.headless=true") arguments.append("-DAPAMA_LOG_IMPL=simple") arguments.append("-DAPAMA_LOG_LEVEL=INFO") arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) arguments.extend(["-c", os.path.join(self.parent.output, "sm-config.xml")]) if not debug: arguments.extend(["-XgenerateDebug", "false"]) arguments.extend(["-Xgenerate", sdfName, monitorName]) arguments.extend(xargs.arguments) ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) # start the process to generate the EPL status = self.parent.startProcess(command, arguments, self.environ, displayName='scenario generation for %s'%os.path.basename(sdfName), ignoreExitStatus=True, **xargs.kwargs) # if successful inject the EPL if status and status.exitStatus == 0: status = self.__inject([monitorName]) if status and status.exitStatus == 0: # delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc os.remove(monitorName) if status and status.exitStatus != 0 and not ignoreExitStatus: self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
[docs] def injectQuery(self, filename, filedir=None, diagnostics=False, **xargs): """Inject a Query into the Correlator. See also L{initialize}. @param filename: The basename of the query file to inject into the Correlator @param filedir: The directory containing filename (defaults to testcase input subdirectory) @param diagnostics: Enable runtime diagnostic logging in the query @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_COMMON_JRE, "bin", "java") jarbase = "ap-query-codegen" # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase) # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set the input and output names for qry and mon monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon") if not filedir: filedir = self.parent.input qryName = os.path.join(filedir, filename) # set arguments to the process arguments = [] arguments.append("-Djava.awt.headless=true") arguments.append("-DAPAMA_LOG_IMPL=simple") arguments.append("-DAPAMA_LOG_LEVEL=INFO") arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)]) arguments.extend(["--host", self.host, "--port", '%s' % self.port, qryName, monitorName]) if diagnostics: arguments.extend(["--diagnostics"]) arguments.extend(xargs.arguments) ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) # start the process to generate the EPL status = self.parent.startProcess(command, arguments, self.environ, displayName='query generation for %s'%os.path.basename(qryName), ignoreExitStatus=True, **xargs.kwargs) # if successful, inject the EPL if status and (status.exitStatus==0): status = self.__inject([monitorName]) if status and status.exitStatus==0: # delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc try: os.remove(monitorName) except Exception as e: self.parent.log.warn('Failed to remove temp file %s: %s', monitorName, e) self.flush(count=6) if status and status.exitStatus != 0 and not ignoreExitStatus: self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
[docs] def sendEventStrings(self, *eventStrings, **xargs): """Send one or more event strings into the Correlator. This method writes a temporary file containing the specified strings. See the documentation for engine_send for more information. @param eventStrings: One or more event strings to be sent to this correlator. May be unicode or UTF-8 byte strings. May include a channel designator. Example: self.sendEventStrings('mypackage.Event1()', 'mypackage.Event2("Hello World")') @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir @keyword channel: The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts. """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send') # set the default stdout and stderr dstdout,dstderr = self.parent.allocateUniqueStdOutErr('send') assert eventStrings # must not be empty assert not isstring(eventStrings) # should be a list of strings, not a string # transform xargs into an instance of the xargs holder class tmpfile = dstderr.replace('.err','')+'.tmp.evt' with openfile(tmpfile,'w', encoding='utf-8') as f: for l in eventStrings: if isinstance(l, binary_type): # we have to guess at the encoding; utf-8 is slightly safer than local/default encoding # since at least 7-bit ascii chars will always work l = l.decode('utf-8') print(l.strip(),file=f) displayName = "engine_send <%s> [%s%s]"%(self.name, eventStrings[0], ' (+%d others)'%(len(eventStrings)-1) if len(eventStrings)>1 else '') # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) arguments.append('--utf8') if xargs.get('channel',''): arguments.extend(["--channel", xargs.pop('channel')]) arguments.append(tmpfile) kwargs = xargs xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) arguments.extend(xargs.arguments) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def send(self, filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs): """Send events from one or more file into the Correlator. See the documentation for engine_send for more information. @param filenames: List of the basename of event files to send into the Correlator @param filedir: Directory containing the input event files (defaults to testcase input directory) @param loop: Number of times to loop through the input file @param utf8: Assume input is in UTF8 @param channel: The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts. @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_send') # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'send') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set location of input files (defaults to input directory) files=[] if not filedir: filedir = self.parent.input if isinstance(filenames,list): for file in filenames: files.append(os.path.join(filedir, file)) elif isstring(filenames): files.append(os.path.join(filedir, filenames)) else: raise Exception("Input parameter for filenames is not a string or list type") displayName = "engine_send <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files))) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if loop: arguments.extend(["--loop", "%s" % loop]) if utf8: arguments.append("--utf8") if channel: arguments.extend(["--channel", channel]) arguments.extend(xargs.arguments) arguments.extend(files) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs): """Delete named objects from the Event Crrelator. @param names: List of names to delete from the Correlator @param filename: The basename of a file containing a set of names to delete @param filedir: The directory containing filename (defaults to testcase input subdirectory) @param force: Force deletion of names even if they are in use @param kill: Kill name even if it is a running monitor @param all: Delete everything in the Correlator @param utf8: Assume input is in UTF8 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_delete') displayName = "engine_delete <%s> %s"%(self.name, '<all>' if all else '[%s]'%(' '.join(names))) # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'delete') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set location of input files (defaults to input directory) if not filedir and filename: filedir = self.parent.input filename = os.path.join(filedir, filename) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if filename: arguments.extend(["-f", filename]) if force: arguments.append("--force") if kill: arguments.append("--kill") if all: arguments.extend(["--all","--yes"]) if utf8: arguments.append("--utf8") arguments.extend(xargs.arguments) if names: arguments.extend(names) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def connect(self, source, channel=None, channels=None, mode=None, **xargs): """Connect a Correlator to this instance as a source. @param source: An instance of the L{CorrelatorHelper} class to act as the source @param channel: The channel to make the connection on @param channels: The list of channels to make the connection on @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'engine_connect') displayName = "engine_connect %s<%s channel '%s' -> %s>"%( 'disconnect ' if ('-x' in xargs.get('arguments',[]) or '--disconnect' in xargs.get('arguments',[])) else '', source.name, channel or '*', self.name) # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'connect') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set the arguments to the process arguments = [] arguments.extend(["-sn", source.host]) arguments.extend(["-sp", "%d" % source.port]) arguments.extend(["-tn", self.host]) arguments.extend(["-tp", "%d" % self.port]) if channel: arguments.extend(["-c", channel]) if channels: assert not isstring(channels) # should be a list of strings, not a string for c in channels: arguments.extend(["-c", c]) if mode: arguments.extend(["-m", mode]) arguments.extend(xargs.arguments) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def disconnect(self, source, channel=None, channels=None, mode=None, **xargs): """Disconnect a correlator to this instance as a source correlator. @param source: An instance of the L{CorrelatorHelper} class acting as the source @param channel: The channel to be disconnected @param channels: The list of channels to be disconnected @param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ if 'arguments' in xargs: if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0: xargs['arguments'].append('-x') else: xargs['arguments'] = ['-x'] self.connect(source, channel, channels, mode, **xargs)
[docs] def applicationEventLogging(self, enable=True, **xargs): """Enable and disable application event logging. Provides a wrapper around the engine_management command line tool to enable and disable application event logging. Once enabled, application event logging will log to the correlator log file information specific processing occurrences, e.g. the receipt of events for processing, the triggering of listeners, execution of the garbage collector etc. @param enable: Set to True to enable, set to False to disable event logging """ if enable: self.manage(arguments=['-r', 'applicationEventLogging', 'on'], **xargs) else: self.manage(arguments=['-r', 'applicationEventLogging', 'off'], **xargs)
[docs] def setApplicationLogFile(self, filename=None, filedir=None, **xargs): """Set the application log file name. On setting the application log file details, the output of all native log commands within EPL will be logged to the designated log file. This allows separation between the log statements written by the Event Correlator i.e. for status, errors etc, and those generated by the actual application. @param filename: The basename of the file to write the application log file to @param filedir: The directory to write filename to (defaults to testcase output subdirectory) """ if not filedir: filedir = self.parent.output self.manage(arguments=['-r', 'setApplicationLogFile', os.path.join(filedir, filename)], **xargs)
[docs] def setApplicationLogLevel(self, verbosity, **xargs): """Set the application log level. @param verbosity: The verbosity level of the application logging """ self.manage(arguments=['-r', 'setApplicationLogLevel', verbosity], **xargs)
[docs] def profilingOn(self, **xargs): """Inform the Event Correlator to start collecting profiling statistics. """ self.manage(arguments=['-r', 'cpuProfile', 'on'], **xargs)
[docs] def profilingOff(self, **xargs): """Inform the Event Correlator to stop collecting profiling statistics. """ self.manage(arguments=['-r', 'cpuProfile', 'off'], **xargs)
[docs] def profilingReset(self, **xargs): """Inform the Event Correlator to reset it's collection of profiling statistics. """ self.manage(arguments=['-r', 'cpuProfile', 'reset'], **xargs)
[docs] def profilingGet(self, filename, filedir=None, **xargs): """Obtain the latest profiling statistics from the Event Correlator. @param filename: The basename of the file to write the profiling statistics to @param filedir: The directory to write filename to (defaults to testcase output subdirectory) """ if not filedir: filedir = self.parent.output self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','cpuProfile', 'get'], **xargs)
[docs] def toStringAll(self, filename, filedir=None, **xargs): """Obtain a stringified representation of the current application state from the Event Correlator. @param filename: The basename of the file to write the dump of application state to @param filedir: The directory to write filename to (defaults to testcase output subdirectory) """ if not filedir: filedir = self.parent.output self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'], **xargs)
[docs] def waitForCorrelatorUp(self, *args, **kwargs): """Block until the Correlator declares itself to be ready for processing. @deprecated: Use waitForComponentUp instead. """ self.waitForComponentUp(*args, **kwargs)
[docs] def flush(self, timeout=60, count=1, **xargs): """Make sure all events have been flushed through the correlator. Currently implemented by using the flushAllQueues management request. Will initate a cycle where each queue in the correlator is drained, optionally repeated count times. This is useful when you have a multi-context application. @param timeout: The amount of time to wait @param count: The number of times to ensure queues are flushed @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ flushAllQueues = 'flushAllQueues' if count > 1: self.manage(arguments=['-r', 'flushAllQueues', str(count)], timeout=timeout, **xargs) else: self.manage(arguments=['-r', 'flushAllQueues'], timeout=timeout, **xargs)
def __inject(self, filenames=[], filedir=None, utf8=False, java=False, cdp=False, **xargs): """Inject an application into the correlator. Returns the process object (or None in some error cases) """ # set the command and display name command = os.path.join(self.parent.project.APAMA_BIN_DIR,'engine_inject') # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'inject') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set location of input files (defaults to input directory) files=[] if not filedir: filedir = self.parent.input if isinstance(filenames,list): for file in filenames: files.append(os.path.join(filedir, file)) elif isstring(filenames): files.append(os.path.join(filedir, filenames)) else: raise Exception("Input parameter for filenames is not a string or list type") if not files: raise Exception("Cannot call inject without specifying the files to be injected") displayName = "engine_inject <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files))) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if utf8: arguments.append("--utf8") if java: arguments.append("--java") if cdp: arguments.append("--cdp") arguments.extend(xargs.arguments) arguments.extend(files) # remove from xargs so we don't pass it to startProcess directly ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None) # start the process result = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, ignoreExitStatus=True, **xargs.kwargs) firstwarningline = '' if result and result.exitStatus != 0: with openfile(xargs.stderr) as f: # bytes on python2, chars with default encoding on python3 for l in f: l = l.replace(self.parent.input+os.sep,'').strip() if not l: continue if not firstwarningline: firstwarningline = l self.parent.log.warning(' %s'%l) if result and result.exitStatus != 0 and not ignoreExitStatus: self.parent.addOutcome(BLOCKED, '%s failed: %s'%(result, firstwarningline) if firstwarningline else '%s returned non-zero exit code %d'%(result, result.exitStatus), abortOnError=self.parent.defaultAbortOnError) return result def __createScenarioManagerConfig(self, blocks=None, functions=None): """Create the Scenario Manager configuration file for the location of all blocks and functions. """ impl = getDOMImplementation() document = impl.createDocument(None, "config", None) rootElement = document.documentElement # create the catalogs entry catalogsElement = document.createElement("catalogs") rootElement.appendChild(catalogsElement) # create the blocks element blocksElement = document.createElement("blocks") try: try: if self.parent.project.BLOCKS_DIR == "": raise Exception("") except Exception: self.parent.project.BLOCKS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "blocks") except AttributeError: raise Exception("Project variable APAMA_HOME undefined in .pysysproject and BLOCKS_DIR unset for PROJECT") tup = [("blocks", self.parent.project.BLOCKS_DIR)] if blocks: tup.extend(blocks) for name, location in tup: catalog = document.createElement("catalog") nameAtttribute = document.createAttribute("name") nameAtttribute.value=name typeAtttribute = document.createAttribute("type") typeAtttribute.value="FILE" locationAtttribute = document.createAttribute("location") locationAtttribute.value=location catalog.setAttributeNode(nameAtttribute) catalog.setAttributeNode(typeAtttribute) catalog.setAttributeNode(locationAtttribute) blocksElement.appendChild(catalog) catalogsElement.appendChild(blocksElement) # create the functions elememt functionsElement = document.createElement("functions") try: try: if self.parent.project.FUNCTIONS_DIR == "": raise Exception("") except Exception: self.parent.project.FUNCTIONS_DIR = os.path.join(self.parent.project.APAMA_HOME, "catalogs", "functions") except AttributeError: raise Exception("Project variable APAMA_HOME undefined in .pysysproject and FUNCTIONS_DIR unset for PROJECT") tup = [("functions", self.parent.project.FUNCTIONS_DIR)] if functions: tup.extend(functions) for name, location in tup: catalog = document.createElement("catalog") nameAtttribute = document.createAttribute("name") nameAtttribute.value=name typeAtttribute = document.createAttribute("type") typeAtttribute.value="FILE" locationAtttribute = document.createAttribute("location") locationAtttribute.value=location catalog.setAttributeNode(nameAtttribute) catalog.setAttributeNode(typeAtttribute) catalog.setAttributeNode(locationAtttribute) functionsElement.appendChild(catalog) catalogsElement.appendChild(functionsElement) with io.open(os.path.join(self.parent.output, "sm-config.xml"), "wb") as fp: fp.write(document.toprettyxml(indent=" ", encoding='utf-8'))
[docs] def hasLicence(self): """ Does this correlator instance have access to a licence file? """ return self.licence != None