#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC). All rights
# Copyright (c) 2013-2023 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
""" Contains PySys extensions for starting and interacting with correlator processes.
"""
import sys, os, string, logging, socket, copy, random, types
import io
import pysys
import pysys.utils.fileutils
from pysys import log
from pysys.constants import *
from pysys.exceptions import *
from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
from apama.common import XArgsHolder
from apama.common import _getApamaCommonJRE
import apama.coverage
[docs]class CorrelatorHelper(ApamaServerProcess):
"""Class for an instance of an Apama correlator.
The easiest way to create an instance of this class and use it to start a new correlator is with the
`apama.testplugin.ApamaPlugin.startCorrelator()` method (available via the ``self.apama`` helper field),
for example::
mycorrelator = self.apama.startCorrelator('myCorrelator', config=[self.input+'/myconfig.yaml'])
mycorrelator.injectEPL(
['${APAMA_HOME}/monitors/'+f for f in ['ManagementImpl.mon' ,'Management.mon', ] ]+
['${appHome}/monitors/'+f for f in ['MyApp1.mon', 'MyApp2.mon', ] ]+
)
:ivar ~.parent: Reference to the PySys pysys.basetest.BaseTest testcase (or pysys.process.user.ProcessUser) instantiating this instance.
:ivar int ~.port: Port used for starting and interaction with the process. If no port parameter is used in the
argument list an available port will be dynamically found from
the OS and used for starting the process, and performing all operations against it.
:ivar str ~.host: Hostname for interaction with a remote process. The host
parameter is only used to perform operations against a remote process started externally to the
PySys framework - the class does not support the starting of a remote process.
:ivar str ~.name: A display name for this process (default is "correlator"), also used for the default
stdout/err filenames.
:ivar dict(str,str) ~.startArgs: Default values can be provided here for any keyword
arguments that are supported by `start()`.
"""
classpath = None
""" Holds the Java classpath used when starting a correlator with JVM."""
licence = None # legacy, prefer to use apamaCorrelatorLicense project property or license= argument
WATCH_COLUMNS = [
"Uptime (ms)",
"# Contexts",
"# Monitors",
"# Sub-monitors",
"# Java apps",
"# Listeners",
"# Sub-listeners",
"# Event types",
"Input queue",
"# Received events",
"Route queue",
"# Routed events",
"# Consumers",
"Output queue",
"# Created output events",
"# Sent output events",
"# Processed events",
"Slowest context name",
"Slowest context queue size",
"Slowest receiver",
"Slowest receiver queue"
]
def __init__(self, parent, port=None, host=None, name='correlator', **startArgs):
ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host)
self.componentType = 'correlator'
self.injectJMON = self.injectJava # old alias for this method
"""
Alias for `injectJava`.
"""
self.injectMon = self.injectEPL # shorter alias for injecting a .mon file
"""
Alias for `injectEPL`.
"""
self.injectMonitorscript = self.injectEPL # Backwards compatibility
"""
Alias for `injectEPL`.
"""
self.__startArgs = startArgs
self.licence = parent.project.getProperty('apamaCorrelatorLicense', '') or os.path.join(self.parent.project.APAMA_WORK, 'license', 'ApamaServerLicense.xml')
if not os.path.exists(self.licence):
# Don't put licence on the command line if it doesn't exist - run
# in evaluation mode, or find it automatically
log.debug('No correlator license found at default location: %s', self.licence)
self.licence = None
""" Set the licence file location. """
[docs] def addToClassPath(self, path):
"""Add the supplied path to the Java classpath variable for starting this instance.
Is is usually better to use YAML configuration for setting the classpath.
"""
if self.classpath == None:
self.classpath = os.path.normpath(path)
else:
self.classpath = self.classpath+os.pathsep+os.path.normpath(path)
[docs] def addToPath(self, path):
"""Add the supplied dynamic library directory path to the ``PATH`` (on Windows) or ``LD_LIBRARY_PATH`` (on Unix)
environment variable for starting this instance.
You can also add to the path when starting the correlator using the ''environ'' argument::
environ={LIBRARY_PATH_ENV_VAR: os.getenv(LIBRARY_PATH_ENV_VAR)+os.pathsep+self.output+'/mylibpath'}
"""
if not IS_WINDOWS: key = "LD_LIBRARY_PATH"
else: key = "PATH"
if key in self.environ:
self.environ[key] = self.environ[key]+os.pathsep+os.path.normpath(path)
else:
self.environ[key] = os.path.normpath(path)
[docs] def start(self, logfile=None, verbosity=None, java=None, Xclock=None, environ=None, inputLog=None, waitForServerUp=None, config = None, configPropertyOverrides=None, license=None, **xargs):
"""Start the correlator.
Note that default values for any of this method's parameters can be passed to the constructor when this object is instantiated.
:param list[str] arguments: a list of additional command line arguments to pass to the correlator.
:param str logfile: Name of the correlator log file (if used, set this to something similar to the display "name"
passed to the constructor). Usually it is better to leave his unset and use the ``name`` of the constructor
which sets both logfile and the stdouterr prefix to the same name.
:param bool java: If True then the correlator will be started with support for injecting Java applications.
:param bool Xclock: If True then the correlator will be started in externally clocked mode, meaning that time
is controlled by sending in ``&TIME(...)`` ticks from a .evt file rather than from the system clock.
:param dict[str,str] environ: Map of environment variables to add or override in addition to the defaults for this process.
For example, to add a directory containing dynamic library plug-ins to the library path variable of the current platform::
environ={LIBRARY_PATH_ENV_VAR: os.getenv(LIBRARY_PATH_ENV_VAR)+os.pathsep+self.output+'/mylibpath'}
:param str inputLog: Relative or absolute path of file to write input log to, containing all events, EPL and other
inputs sent to the correlator. The format of the input log may change without notice so should not be
replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the
input log can also be used to pass information to customer support in the event of a problem.
:param bool waitForServerUp: Set to False to disable automatically waiting until the component is ready.
:param list[str]|str config: path or list of paths to a initialization or connectivity configuration .yaml file or directory
containing them
:param configPropertyOverrides: a dictionary of key,value pairs to be passed as Apama (not Java) configuration
property overrides using the ``-D`` flag.
:param license: a path to the correlator license file. This can also be provided using the project property ``apamaCorrelatorLicense``.
If not specified, the default location is checked and if not present there then the correlator runs without a license.
:param str verbosity: The default verbosity level of all correlator logging. Setting this to DEBUG will give
more information, but usually the log is hard to read, so instead use a yaml file to set the level just
for the packages of interest.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'correlator')
dstdout, dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name)
startargs = copy.deepcopy(self.__startArgs)
def argsExtractor(argName, value, defaultValue): # get the arg either from start(xxx=) OR else from the argument stashed by the constructor
tempValue = startargs.pop(argName, None)
if value is not None: return value
return defaultValue if tempValue is None else tempValue
self.logfile = logfile = argsExtractor('logfile',logfile,None)
verbosity = argsExtractor('verbosity',verbosity,None)
java = argsExtractor('java',java,False)
Xclock = argsExtractor('Xclock',Xclock,None)
environ = argsExtractor('environ',environ,None)
inputLog = argsExtractor('inputLog',inputLog,None)
waitForServerUp = argsExtractor('waitForServerUp',waitForServerUp,True)
config = argsExtractor('config',config,None)
configPropertyOverrides = argsExtractor('configPropertyOverrides',configPropertyOverrides,None)
self.licence = argsExtractor('license',license,None) or self.licence # "license" spelling is now preferred as that's what correlator uses; "self.licence" field is less used, and not worth fixing
xargs = dict(list(startargs.items()) + list(xargs.items())) # xargs takes precedence
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project)
# set the arguments to the process
arguments = []
arguments.extend(["--name", self.name, "-p", "%d" % self.port])
if self.licence: arguments.extend(["-l", "%s" % self.licence])
if logfile: arguments.extend(["-f", logfile])
if verbosity: arguments.extend(["-v", verbosity])
if java: arguments.append("--java")
if inputLog: arguments.extend(["--inputLog", os.path.join(self.parent.output, inputLog)])
if Xclock: arguments.append("-Xclock")
if config is not None:
if isinstance(config, str):
arguments.extend(['--config', config])
elif type(config) is list:
for param in config:
arguments.extend(['--config', param])
else:
raise Exception("Input parameter for config is not a string or list type")
if java and self.classpath:
arguments.extend(["--javaopt", "-Djava.class.path=%s" % self.classpath])
if configPropertyOverrides:
arguments.extend(['-D%s=%s' % (p, configPropertyOverrides[p]) for p in configPropertyOverrides])
arguments.extend(xargs.arguments)
env = {}
if apama.coverage.EPLCoverageWriter._isEPLCoverageEnabled(getattr(self.parent, 'runner', self.parent)) and not self.parent.getBoolProperty('disableCoverage'):
env['AP_EPL_COVERAGE_FILE'] = 'correlator.${PID}.eplcoverage'
# start the process
for key in self.environ: env[key] = self.environ[key]
if environ:
for key in environ: env[key] = environ[key]
if self.process and self.process.running():
raise Exception('Cannot start component as an instance is already running: %s'%self)
self.process = None
try:
hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), onError=lambda process: None, **xargs.kwargs)
self.process = hprocess
if waitForServerUp:
self.waitForComponentUp(timeout=xargs.timeout)
except Exception:
for f in [
logfile,
self.process.stdout if self.process else xargs.stdout,
self.process.stderr if self.process else xargs.stderr]:
if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*', 'Invalid Correlator argument.*'], tail=True): break
raise
return hprocess
[docs] def injectTestEventLogger(self, channels):
"""
Injects a special monitor that listens for all input/output events on the specified channel(s) and writes them
to the correlator log in a format that can later be read into a Python dictionary using the
`apama.testplugin.ApamaPlugin.extractEventLoggerOutput()` method.
This is a really convenient way to access and manipulate Apama event data within your Python test
validation logic, typically using `pysys.basetest.BaseTest.assertThat`. This approach is a lot more reliable
than using regular expressions or ".evt" file diffs.
The format used for these log lines is internal and subject to change, so should only be used by
the `extractEventLoggerOutput` method.
Note that there some event types cannot be handled with this logger (mostly unusual cases such as
cyclic event types, and dictionary fields with complex key types). In the unlikely event that you get an error
for the event type you need to test, you will need to use a different approach for validating your events.
The events are logged in the EPL package ``apama.test`` and the monitor name is ``TestEventLogger``, so you
can specify that package in your logging configuration if you wish to redirect these lines to a file other
than the main correlator log.
Only a single instance of this can be active in a correlator at a time.
:param list[str] channels: The correlator channels the event logging monitor will subscribe to. Add an empty
string to this list to include events sent to any public context.
"""
# could optionally put timestamp and channel info in here too
monfile = self.parent.output+'/__apama_TestEventLogger.tmp.mon'
self.parent.write_text(monfile, r"""package apama.test;
monitor TestEventLogger {
import "JSONPlugin" as _plugin; // this is not public API, do not copy this into your own code/tests
action onload() {
spawn privateContext() to context("JSONEventLogger", @public@);
}
action privateContext()
{
@subscriptions@
on all any() as evt {
try {
string json := _plugin.toJSON(evt);
json := "{\".eventType\":\""+evt.getTypeName()+"\","+json.substring(1, json.length());
log "-- Got test event: "+json at INFO;
} catch (com.apama.exceptions.Exception ex) {
log "TestEventLogger cannot process this event: "+ex.getMessage()+": "+evt.toString() at ERROR;
}
}
}
}
""".replace('@public@', 'true' if (not channels or '' in channels) else 'false').replace('@subscriptions@', '\n'.join('\t\tmonitor.subscribe("%s");'%c.replace('\\','\\\\') for c in channels))
)
self.injectEPL([monfile])
if not self.parent.log.isEnabledFor(logging.DEBUG): os.remove(monfile)
[docs] def receive(self, filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False,
logChannels=False, **xargs):
"""Start a process that will receive events sent from the correlator.
See also `injectTestEventLogger()` which provides an alternative way to record the events sent out of the
correlator that is easier to process from your Python test case.
:param filename: The basename of the file to write events received from the correlator to
:param filedir: The directory to write filename to (defaults to testcase output subdirectory)
:param channels: List of channel names to subscribe to
:param logChannels: Print the channel each event came from in the output
:param suppressBatch: Do not include BATCH timestamps in the output
:param zeroAtFirstBatch: Measure BATCH timestamps from when the first batch arrived
:param utf8: Write output in UTF8
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
:return: The process for the receiver.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin','engine_receive')
displayName = 'engine_receive <%s> [%s]'%(self.name, os.path.basename(filename))
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'receive')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set location of output file (defaults to output subdirectory)
if not filedir: filedir = self.parent.output
file = os.path.join(filedir, filename)
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if filename: arguments.extend(["-f", file])
if suppressBatch: arguments.append("--suppressbatch")
if zeroAtFirstBatch: arguments.append("--zeroatfirstbatch")
if utf8: arguments.append("--utf8")
if logChannels:
arguments.append("--logChannels")
arguments.extend(xargs.arguments)
for channel in channels:
arguments.extend(['--channel', channel])
# start the process
proc = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
self.parent.waitForFile(filename, filedir = filedir)
return proc
[docs] def watch(self, filename=None, filedir=None, raw=False, interval=None, **xargs):
"""Obtain runtime operational statistics from the correlator.
By default this runs as a BACKGROUND process. The process is returned by the method call.
:param filename: The basename of the file to write the runtime operational status to
:param filedir: The directory to write filename to (defaults to testcase output subdirectory)
:param raw: Obtain csv format data when logging to file
:param interval: The polling interval (seconds) between logging to file
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
:note: When outputing data in the raw (csv) format, the column identifiers and their positions
are defined by WATCH_COLUMNS. Use CorrelatorHelper.WATCH_COLUMNS to look up the column position for a given identifier:
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'engine_watch')
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'watch')
displayName = "engine_watch <%s> -> %s"%(self.name, os.path.basename(filename or dstdout))
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set location of output file (defaults to output subdirectory)
if not filedir: filedir = self.parent.output
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if filename: arguments.extend(["-f", os.path.join(filedir, filename)])
if raw: arguments.append("--raw")
if interval: arguments.extend(["-i", "%d" % int(interval*1000)])
arguments.extend(xargs.arguments)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def inspect(self, filename='inspect.txt', filedir=None, raw=False, **xargs):
"""Obtain information about what application(s) have been injected
into the correlator and what listeners are in existence.
This runs as a FOREGROUND process.
:param filename: The basename of the file to write the information to, e.g. inspect.txt
:param filedir: The directory to write filename to (defaults to testcase output subdirectory)
:param raw: Use parser-friendly output format
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
assert filename
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'engine_inspect')
displayName = "engine_inspect <%s> -> %s"%(self.name, os.path.basename(filename))
# set the default stdout and stderr
dstdout,dstderr = os.path.join(self.parent.output, filename), os.path.join(self.parent.output, filename.replace('.txt','')+'.err')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, state=FOREGROUND, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set location of output file (defaults to output subdirectory)
if not filedir: filedir = self.parent.output
file = os.path.join(filedir, filename)
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if raw: arguments.append("--raw")
arguments.extend(xargs.arguments)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def initialize(self, path, correlatorName=None, properties=None, include=None, exclude=None, **xargs):
"""Initialize the correlator by injecting all the files making up the project,
typically based on a Designer launch configuration .deploy file.
This is usually the simplest way to inject all the files from an application
into the correlator. Alternative approaches are to call the L{injectEPL} and related
methods individually for each file, or to specify the files in the "initialization"
section of a yaml file passed into the correlator L{start} call using the "config" argument.
Queries and Digital Event Services .mon files will be generated automatically as part of injection,
but any Java jar files must be compiled manually before invoking this method.
:param path: Path of a .deploy file from Designer (recommended), a directory,
or a text file listing the files to be injected.
Must be an absolute path, or relative to the testcase output dir.
:param correlatorName: The name of the correlator as specified in the launch configuration .deploy
file, e.g "defaultCorrelator". If not specified, the name of this pysys correlator will be used.
:param properties: Optional path to a .properties file specifying ${var} placeholders
to be used for resolving the paths of any files outside the project directory.
Absolute path or relative to output dir.
:param include: a comma-separated string specifying which of the project files found by the tool
should be injected, e.g. ``**/foo/Bar*.evt,**.mon``. If not specified, all files will be included
(unless specifically excluded)
:param exclude: a comma-separated string specifying which of the project files found by the tool
should NOT be injected, e.g. ``**/foo/Bar*.evt``.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(_getApamaCommonJRE(self.parent.project), "bin", "java")
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'initialize-%s'%self.name)
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set the input and output names for qry and mon
path = os.path.join(self.parent.output, path)
# set arguments to the process
arguments = []
arguments.append("-Djava.awt.headless=true")
arguments.append("-Dlog4j.configurationFile=file:///%s/etc/log4j-engine-deploy.xml"%self.parent.project.APAMA_HOME.replace('\\','/'))
arguments.append("-Djava.io.tmpdir=%s"%self.parent.output)
arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', 'ap-deploy.jar')])
arguments.append(path)
if properties:
assert isinstance(properties, str)
arguments.append(os.path.join(self.parent.output, properties))
if correlatorName or ('!' not in path):
arguments.extend(['--correlatorName', correlatorName or self.name])
arguments.extend(['--inject', self.host, str(self.port)])
if include:
assert isinstance(include, str)
arguments.extend(['--include', include])
if exclude:
assert isinstance(exclude, str)
arguments.extend(['--exclude', exclude])
arguments.extend(xargs.arguments)
# start the process to generate the EPL
status = self.parent.startProcess(command, arguments, self.environ,
displayName='initialize <%s> [%s]'%(self.name, os.path.basename(path)), **xargs.kwargs)
# in case there are queries involved
return status
[docs] def injectEPL(self, filenames=[], filedir=None, utf8=False, **xargs):
"""Inject EPL ``*.mon`` files into the correlator.
Note that it is much more efficient to inject all files in a single call then to make many separate calls.
For example::
self.injectEPL(
['${APAMA_HOME}/monitors/'+f for f in ['ManagementImpl.mon' ,'Management.mon', ] ]+
['${appHome}/monitors/'+f for f in ['MyApp1.mon', 'MyApp2.mon', ] ]+
...
)
See also L{initialize}, which can be used to inject an entire Apama project.
:param filenames: List of the EPL files to inject into the correlator, either absolute paths or relative to the ``filedir``.
Any ``${...}`` project properties will be expanded.
If the same path is listed more than once in a single call to this method, the later ones will be ignored
to prevent duplicate injection errors (as of version 10.15.3).
:param filedir: By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
:param utf8: Specifies the file contents are encoded in UTF-8, rather than in the machine's default encoding.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
return self.__inject(filenames, filedir, utf8, False, False, **xargs)
[docs] def injectJava(self, filename, filedir=None, **xargs):
"""Inject a Java plug-in or application into the correlator.
See also L{initialize}.
:param filename: The path of the jar file to inject into the correlator.
:param filedir: By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
return self.__inject([filename], filedir, False, True, False, **xargs)
[docs] def injectCDP(self, filenames=[], filedir=None, **xargs):
"""Inject correlator deployment package into the correlator.
See also L{initialize}.
:param filenames: List of the paths of cdp files to inject into the correlator.
:param filedir: By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
return self.__inject(filenames, filedir, False, False, True, **xargs)
[docs] def injectQuery(self, filename, filedir=None, diagnostics=False, **xargs):
"""Inject a Query into the correlator.
See also L{initialize}.
:param filename: The query file to inject into the correlator
:param filedir: By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
:param diagnostics: Enable runtime diagnostic logging in the query
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(_getApamaCommonJRE(self.parent.project), "bin", "java")
jarbase = "ap-query-codegen"
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, jarbase)
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set the input and output names for qry and mon
monitorName = os.path.join(self.parent.output, os.path.basename(filename) + ".mon")
if not filedir: filedir = self.parent.input
qryName = os.path.join(filedir, filename)
# set arguments to the process
arguments = []
arguments.append("-Djava.awt.headless=true")
arguments.append("-DAPAMA_LOG_IMPL=simple")
arguments.append("-DAPAMA_LOG_LEVEL=INFO")
arguments.extend(["-jar", os.path.join(self.parent.project.APAMA_HOME, 'lib', '%s.jar' % jarbase)])
arguments.extend(["--host", self.host, "--port", '%s' % self.port, qryName, monitorName])
if diagnostics: arguments.extend(["--diagnostics"])
arguments.extend(xargs.arguments)
ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
# start the process to generate the EPL
status = self.parent.startProcess(command, arguments, self.environ,
displayName='query generation for %s'%os.path.basename(qryName), ignoreExitStatus=True, **xargs.kwargs)
# if successful, inject the EPL
if status and (status.exitStatus==0):
status = self.__inject([monitorName])
if status and status.exitStatus==0:
# delete after successful injection, since it's a temporary file and shouldn't be in code coverage reports etc
try:
os.remove(monitorName)
except Exception as e:
self.parent.log.warn('Failed to remove temp file %s: %s', monitorName, e)
self.flush(count=6)
if status and status.exitStatus != 0 and not ignoreExitStatus:
self.parent.addOutcome(BLOCKED, '%s returned non-zero exit code %d'%(status, status.exitStatus), abortOnError=self.parent.defaultAbortOnError)
[docs] def sendEventStrings(self, *eventStrings, **xargs):
"""Send one or more event strings into the correlator.
This method writes a temporary file containing the specified strings.
See the documentation for engine_send for more information.
For example::
self.sendEventStrings('mypackage.Event1()', 'mypackage.Event2("Hello World")')
:param eventStrings: One or more event strings to be sent to this correlator.
May be unicode or UTF-8 byte strings. May include a channel designator.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
:keyword channel: The channel to which events are to be sent except when specified
on a per-event basis. If a channel is not specified for
an event and you do not specify this option, the event is delivered to the
default channel, which means the event will go to all public contexts.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'engine_send')
# set the default stdout and stderr
dstdout,dstderr = self.parent.allocateUniqueStdOutErr('send')
assert eventStrings # must not be empty
assert not isinstance(eventStrings, str) # should be a list of strings, not a string
# transform xargs into an instance of the xargs holder class
tmpfile = dstderr.replace('.err','')+'.tmp.evt'
with io.open(pysys.utils.fileutils.toLongPathSafe(tmpfile),'w', encoding='utf-8') as f:
for l in eventStrings:
if isinstance(l, bytes):
# we have to guess at the encoding; utf-8 is slightly safer than local/default encoding
# since at least 7-bit ascii chars will always work
l = l.decode('utf-8')
print(l.strip(),file=f)
displayName = "engine_send <%s> [%s%s]"%(self.name, eventStrings[0], ' (+%d others)'%(len(eventStrings)-1) if len(eventStrings)>1 else '')
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
arguments.append('--utf8')
if xargs.get('channel',''): arguments.extend(["--channel", xargs.pop('channel')])
arguments.append(tmpfile)
kwargs = xargs
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
arguments.extend(xargs.arguments)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def send(self, filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs):
"""Send events from one or more file into the correlator.
See the documentation for engine_send for more information.
:param filenames: List of the event file paths to send into the correlator
:param filedir: By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
:param loop: Number of times to loop through the input file
:param utf8: Specifies the file contents are encoded in UTF-8, rather than in the machine's default encoding.
:param channel: The channel to which events are to be sent except when specified
on a per-event basis. If a channel is not specified for
an event and you do not specify this option, the event is delivered to the
default channel, which means the event will go to all public contexts.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'engine_send')
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'send')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set location of input files (defaults to input directory)
files=[]
if not filedir: filedir = self.parent.input
if isinstance(filenames,list):
for file in filenames: files.append(os.path.join(filedir, file))
elif isinstance(filenames, str):
files.append(os.path.join(filedir, filenames))
else:
raise Exception("Input parameter for filenames is not a string or list type")
displayName = "engine_send <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files)))
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if loop: arguments.extend(["--loop", "%s" % loop])
if utf8: arguments.append("--utf8")
if channel: arguments.extend(["--channel", channel])
arguments.extend(xargs.arguments)
arguments.extend(files)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def delete(self, names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs):
"""Delete named objects from the Event Crrelator.
:param names: List of names to delete from the correlator
:param filename: The basename of a file containing a set of names to delete
:param filedir: The directory containing filename (defaults to testcase input subdirectory)
:param force: Force deletion of names even if they are in use
:param kill: Kill name even if it is a running monitor
:param all: Delete everything in the correlator
:param utf8: Specifies the file contents are encoded in UTF-8, rather than in the machine's default encoding.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'engine_delete')
displayName = "engine_delete <%s> %s"%(self.name, '<all>' if all else '[%s]'%(' '.join(names)))
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'delete')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set location of input files (defaults to input directory)
if not filedir and filename:
filedir = self.parent.input
filename = os.path.join(filedir, filename)
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if filename: arguments.extend(["-f", filename])
if force: arguments.append("--force")
if kill: arguments.append("--kill")
if all: arguments.extend(["--all","--yes"])
if utf8: arguments.append("--utf8")
arguments.extend(xargs.arguments)
if names: arguments.extend(names)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def connect(self, source, channel=None, channels=None, mode=None, **xargs):
"""Connect a correlator to this instance as a source.
:param source: An instance of the L{CorrelatorHelper} class to act as the source
:param channel: The channel to make the connection on
:param channels: The list of channels to make the connection on
:param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'engine_connect')
displayName = "engine_connect %s<%s channel '%s' -> %s>"%(
'disconnect ' if ('-x' in xargs.get('arguments',[]) or '--disconnect' in xargs.get('arguments',[])) else '',
source.name, channel or '*', self.name)
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'connect')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set the arguments to the process
arguments = []
arguments.extend(["-sn", source.host])
arguments.extend(["-sp", "%d" % source.port])
arguments.extend(["-tn", self.host])
arguments.extend(["-tp", "%d" % self.port])
if channel: arguments.extend(["-c", channel])
if channels:
assert not isinstance(channels, str) # should be a list of strings, not a string
for c in channels:
arguments.extend(["-c", c])
if mode: arguments.extend(["-m", mode])
arguments.extend(xargs.arguments)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def disconnect(self, source, channel=None, channels=None, mode=None, **xargs):
"""Disconnect a correlator to this instance as a source correlator.
:param source: An instance of the L{CorrelatorHelper} class acting as the source
:param channel: The channel to be disconnected
:param channels: The list of channels to be disconnected
:param mode: The connection mode - 'legacy' or 'parallel'; parallel uses a connection per channel
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
if 'arguments' in xargs:
if xargs['arguments'].count('-x') == 0 and xargs['arguments'].count('--disconnect') == 0:
xargs['arguments'].append('-x')
else:
xargs['arguments'] = ['-x']
self.connect(source, channel, channels, mode, **xargs)
[docs] def applicationEventLogging(self, enable=True, **xargs):
"""Enable and disable application event logging.
Provides a wrapper around the engine_management command line tool to enable and disable
application event logging. Once enabled, application event logging will log to the correlator
log file information specific processing occurrences, e.g. the receipt of events for
processing, the triggering of listeners, execution of the garbage collector etc.
:param enable: Set to True to enable, set to False to disable event logging
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
if enable:
self.manage(arguments=['-r', 'applicationEventLogging', 'on'], **xargs)
else:
self.manage(arguments=['-r', 'applicationEventLogging', 'off'], **xargs)
[docs] def setApplicationLogFile(self, filename=None, filedir=None, **xargs):
"""Set the application log file name.
On setting the application log file details, the output of all native log commands within
EPL will be logged to the designated log file. This allows separation between the
log statements written by the correlator i.e. for status, errors etc, and those
generated by the actual application.
:param filename: The basename or path of the file to write the application log file to
:param filedir: The directory to write filename to (defaults to testcase output subdirectory)
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
if not filedir: filedir = self.parent.output
self.manage(arguments=['-r', 'setApplicationLogFile', os.path.join(filedir, filename)], **xargs)
[docs] def setApplicationLogLevel(self, verbosity, **xargs):
"""Set the application log level.
:param verbosity: The verbosity level of the application logging
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
self.manage(arguments=['-r', 'setApplicationLogLevel', verbosity], **xargs)
[docs] def profilingOn(self, **xargs):
"""Inform the correlator to start collecting profiling statistics.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
self.manage(arguments=['-r', 'cpuProfile', 'on'], **xargs)
[docs] def profilingOff(self, **xargs):
"""Inform the correlator to stop collecting profiling statistics.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
self.manage(arguments=['-r', 'cpuProfile', 'off'], **xargs)
[docs] def profilingReset(self, **xargs):
"""Inform the correlator to reset it's collection of profiling statistics.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
self.manage(arguments=['-r', 'cpuProfile', 'reset'], **xargs)
[docs] def profilingGet(self, filename, filedir=None, **xargs):
"""Obtain the latest profiling statistics from the correlator.
:param filename: The basename of the file to write the profiling statistics to
:param filedir: The directory to write filename to (defaults to testcase output subdirectory)
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
if not filedir: filedir = self.parent.output
self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','cpuProfile', 'get'], **xargs)
[docs] def toStringAll(self, filename, filedir=None, **xargs):
"""Obtain a stringified representation of the current application state from the correlator.
:param filename: The basename of the file to write the dump of application state to
:param filedir: The directory to write filename to (defaults to testcase output subdirectory)
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
if not filedir: filedir = self.parent.output
self.manage(stdout=os.path.join(filedir, filename), arguments=['-r','toStringAll'], **xargs)
[docs] def flush(self, timeout=60, count=1, **xargs):
"""Make sure all events have been flushed through the correlator.
Currently implemented by using the flushAllQueues management request.
Will initate a cycle where each queue in the correlator is drained,
optionally repeated count times. This is useful when you have a
multi-context application.
:param timeout: The amount of time to wait in seconds.
:param count: The number of times to ensure queues are flushed.
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
flushAllQueues = 'flushAllQueues'
if count > 1:
self.manage(arguments=['-r', 'flushAllQueues', str(count)], timeout=timeout, **xargs)
else:
self.manage(arguments=['-r', 'flushAllQueues'], timeout=timeout, **xargs)
def __inject(self, filenames=[], filedir=None, utf8=False, java=False, cdp=False, **xargs):
"""Inject an application into the correlator.
Returns the process object (or None in some error cases)
:param xargs: Optional L{pysys.process.user.ProcessUser.startProcess} keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin','engine_inject')
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'inject')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set location of input files (defaults to input directory)
files=[]
if not filedir: filedir = self.parent.input
def tryExpandProperties(x):
try:
return self.parent.project.expandProperties(x)
except Exception as ex: # expansion was added in 10.15.3; this makes sure it doesn't break anything
log.debug('Cannot expand project properties in "%s" : %s', x, repr(ex))
return x
if isinstance(filenames,list):
alreadyseen = set()
for file in filenames:
path = os.path.normpath(os.path.join(filedir, tryExpandProperties(file)))
if os.path.normcase(path) in alreadyseen:
log.debug('Ignoring attempt to inject file more than once: %s'%path)
else:
files.append(path)
alreadyseen.add(os.path.normcase(path))
elif isinstance(filenames, str):
files.append(os.path.join(filedir, tryExpandProperties(filenames)))
else:
raise Exception("Input parameter for filenames is not a string or list type")
if not files: raise Exception("Cannot call inject without specifying the files to be injected")
displayName = "engine_inject <%s> [%s]"%(self.name, ' '.join(map(os.path.basename, files)))
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if utf8: arguments.append("--utf8")
if java: arguments.append("--java")
if cdp: arguments.append("--cdp")
arguments.extend(xargs.arguments)
arguments.extend(files)
# remove from xargs so we don't pass it to startProcess directly
ignoreExitStatus = xargs.kwargs.pop('ignoreExitStatus', None)
# start the process
result = self.parent.startProcess(command, arguments, self.environ, displayName=displayName, ignoreExitStatus=True,
onError=lambda process: None, **xargs.kwargs)
firstwarningline = ''
if result and result.exitStatus != 0:
with io.open(pysys.utils.fileutils.toLongPathSafe(xargs.stderr)) as f: # chars with default encoding
for l in f:
l = l.replace(self.parent.input+os.sep,'').strip()
if not l: continue
if not firstwarningline: firstwarningline = l
self.parent.log.warning(' %s'%l)
if result and result.exitStatus != 0 and not ignoreExitStatus:
self.parent.addOutcome(BLOCKED,
'%s failed: %s'%(result, firstwarningline) if firstwarningline else '%s returned non-zero exit code %d'%(result, result.exitStatus),
abortOnError=self.parent.defaultAbortOnError)
return result
[docs] def hasLicence(self):
""" Does this correlator instance have access to a licence file? """
return self.licence != None