apama.correlator¶
Contains PySys extensions for starting and interacting with correlator processes.
CorrelatorHelper¶
- class apama.correlator.CorrelatorHelper(parent, port=None, host=None, name='correlator', **startArgs)[source]¶
Bases:
apama.common.ApamaServerProcess
Class for an instance of an Apama correlator.
The easiest way to create an instance of this class and use it to start a new correlator is with the
apama.testplugin.ApamaPlugin.startCorrelator()
method (available via theself.apama
helper field), for example:mycorrelator = self.apama.startCorrelator('myCorrelator', config=[self.input+'/myconfig.yaml']) mycorrelator.injectEPL( ['${APAMA_HOME}/monitors/'+f for f in ['ManagementImpl.mon' ,'Management.mon', ] ]+ ['${appHome}/monitors/'+f for f in ['MyApp1.mon', 'MyApp2.mon', ] ]+ )
- Variables
~.parent – Reference to the PySys pysys.basetest.BaseTest testcase (or pysys.process.user.ProcessUser) instantiating this instance.
~.port (int) – Port used for starting and interaction with the process. If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the process, and performing all operations against it.
~.host (str) – Hostname for interaction with a remote process. The host parameter is only used to perform operations against a remote process started externally to the PySys framework - the class does not support the starting of a remote process.
~.name (str) – A display name for this process (default is “correlator”), also used for the default stdout/err filenames.
~.startArgs (dict(str,str)) – Default values can be provided here for any keyword arguments that are supported by
start()
.
- classpath = None¶
Holds the Java classpath used when starting a correlator with JVM.
- injectJMON¶
Alias for
injectJava
.
- addToClassPath(path)[source]¶
Add the supplied path to the Java classpath variable for starting this instance. Is is usually better to use YAML configuration for setting the classpath.
- addToPath(path)[source]¶
Add the supplied dynamic library directory path to the
PATH
(on Windows) orLD_LIBRARY_PATH
(on Unix) environment variable for starting this instance.You can also add to the path when starting the correlator using the ‘’environ’’ argument:
environ={LIBRARY_PATH_ENV_VAR: os.getenv(LIBRARY_PATH_ENV_VAR)+os.pathsep+self.output+'/mylibpath'}
- start(logfile=None, verbosity=None, java=None, Xclock=None, environ=None, inputLog=None, waitForServerUp=None, config=None, configPropertyOverrides=None, license=None, **xargs)[source]¶
Start the correlator.
Note that default values for any of this method’s parameters can be passed to the constructor when this object is instantiated.
- Parameters
arguments (list[str]) – a list of additional command line arguments to pass to the correlator.
logfile (str) – Name of the correlator log file (if used, set this to something similar to the display “name” passed to the constructor). Usually it is better to leave his unset and use the
name
of the constructor which sets both logfile and the stdouterr prefix to the same name.java (bool) – If True then the correlator will be started with support for injecting Java applications.
Xclock (bool) – If True then the correlator will be started in externally clocked mode, meaning that time is controlled by sending in
&TIME(...)
ticks from a .evt file rather than from the system clock.environ (dict[str,str]) –
Map of environment variables to add or override in addition to the defaults for this process.
For example, to add a directory containing dynamic library plug-ins to the library path variable of the current platform:
environ={LIBRARY_PATH_ENV_VAR: os.getenv(LIBRARY_PATH_ENV_VAR)+os.pathsep+self.output+'/mylibpath'}
inputLog (str) – Relative or absolute path of file to write input log to, containing all events, EPL and other inputs sent to the correlator. The format of the input log may change without notice so should not be replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the input log can also be used to pass information to customer support in the event of a problem.
waitForServerUp (bool) – Set to False to disable automatically waiting until the component is ready.
config (list[str]|str) – path or list of paths to a initialization or connectivity configuration .yaml file or directory containing them
configPropertyOverrides – a dictionary of key,value pairs to be passed as Apama (not Java) configuration property overrides using the
-D
flag.license – a path to the correlator license file. This can also be provided using the project property
apamaCorrelatorLicense
. If not specified, the default location is checked and if not present there then the correlator runs without a license.verbosity (str) – The default verbosity level of all correlator logging. Setting this to DEBUG will give more information, but usually the log is hard to read, so instead use a yaml file to set the level just for the packages of interest.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- injectTestEventLogger(channels)[source]¶
Injects a special monitor that listens for all input/output events on the specified channel(s) and writes them to the correlator log in a format that can later be read into a Python dictionary using the
apama.testplugin.ApamaPlugin.extractEventLoggerOutput()
method.This is a really convenient way to access and manipulate Apama event data within your Python test validation logic, typically using
pysys.basetest.BaseTest.assertThat
. This approach is a lot more reliable than using regular expressions or “.evt” file diffs.The format used for these log lines is internal and subject to change, so should only be used by the
extractEventLoggerOutput
method.Note that there some event types cannot be handled with this logger (mostly unusual cases such as cyclic event types, and dictionary fields with complex key types). In the unlikely event that you get an error for the event type you need to test, you will need to use a different approach for validating your events.
The events are logged in the EPL package
apama.test
and the monitor name isTestEventLogger
, so you can specify that package in your logging configuration if you wish to redirect these lines to a file other than the main correlator log.Only a single instance of this can be active in a correlator at a time.
- Parameters
channels (list[str]) – The correlator channels the event logging monitor will subscribe to. Add an empty string to this list to include events sent to any public context.
- receive(filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False, logChannels=False, **xargs)[source]¶
Start a process that will receive events sent from the correlator.
See also
injectTestEventLogger()
which provides an alternative way to record the events sent out of the correlator that is easier to process from your Python test case.- Parameters
filename – The basename of the file to write events received from the correlator to
filedir – The directory to write filename to (defaults to testcase output subdirectory)
channels – List of channel names to subscribe to
logChannels – Print the channel each event came from in the output
suppressBatch – Do not include BATCH timestamps in the output
zeroAtFirstBatch – Measure BATCH timestamps from when the first batch arrived
utf8 – Write output in UTF8
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- Returns
The process for the receiver.
- watch(filename=None, filedir=None, raw=False, interval=None, **xargs)[source]¶
Obtain runtime operational statistics from the correlator.
By default this runs as a BACKGROUND process. The process is returned by the method call.
- Parameters
filename – The basename of the file to write the runtime operational status to
filedir – The directory to write filename to (defaults to testcase output subdirectory)
raw – Obtain csv format data when logging to file
interval – The polling interval (seconds) between logging to file
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- Note
When outputing data in the raw (csv) format, the column identifiers and their positions are defined by WATCH_COLUMNS. Use CorrelatorHelper.WATCH_COLUMNS to look up the column position for a given identifier:
- inspect(filename='inspect.txt', filedir=None, raw=False, **xargs)[source]¶
Obtain information about what application(s) have been injected into the correlator and what listeners are in existence.
This runs as a FOREGROUND process.
- Parameters
filename – The basename of the file to write the information to, e.g. inspect.txt
filedir – The directory to write filename to (defaults to testcase output subdirectory)
raw – Use parser-friendly output format
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- initialize(path, correlatorName=None, properties=None, include=None, exclude=None, **xargs)[source]¶
Initialize the correlator by injecting all the files making up the project, typically based on a Designer launch configuration .deploy file.
This is usually the simplest way to inject all the files from an application into the correlator. Alternative approaches are to call the
injectEPL
and related methods individually for each file, or to specify the files in the “initialization” section of a yaml file passed into the correlatorstart
call using the “config” argument.Queries .mon files will be generated automatically as part of injection, but any Java jar files must be compiled manually before invoking this method.
- Parameters
path – Path of a .deploy file from Designer (recommended), a directory, or a text file listing the files to be injected. Must be an absolute path, or relative to the testcase output dir.
correlatorName – The name of the correlator as specified in the launch configuration .deploy file, e.g “defaultCorrelator”. If not specified, the name of this pysys correlator will be used.
properties – Optional path to a .properties file specifying ${var} placeholders to be used for resolving the paths of any files outside the project directory. Absolute path or relative to output dir.
include – a comma-separated string specifying which of the project files found by the tool should be injected, e.g.
**/foo/Bar*.evt,**.mon
. If not specified, all files will be included (unless specifically excluded)exclude – a comma-separated string specifying which of the project files found by the tool should NOT be injected, e.g.
**/foo/Bar*.evt
.xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- injectEPL(filenames=[], filedir=None, utf8=False, **xargs)[source]¶
Inject EPL
*.mon
files into the correlator.Note that it is much more efficient to inject all files in a single call then to make many separate calls. For example:
self.injectEPL( ['${APAMA_HOME}/monitors/'+f for f in ['ManagementImpl.mon' ,'Management.mon', ] ]+ ['${appHome}/monitors/'+f for f in ['MyApp1.mon', 'MyApp2.mon', ] ]+ ... )
See also
initialize
, which can be used to inject an entire Apama project.- Parameters
filenames – List of the EPL files to inject into the correlator, either absolute paths or relative to the
filedir
. Any${...}
project properties will be expanded. If the same path is listed more than once in a single call to this method, the later ones will be ignored to prevent duplicate injection errors (as of version 10.15.3).filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
utf8 – Specifies the file contents are encoded in UTF-8, rather than in the machine’s default encoding.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- injectJava(filename, filedir=None, **xargs)[source]¶
Inject a Java plug-in or application into the correlator.
See also
initialize
.- Parameters
filename – The path of the jar file to inject into the correlator.
filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- injectCDP(filenames=[], filedir=None, **xargs)[source]¶
Inject correlator deployment package into the correlator.
See also
initialize
.- Parameters
filenames – List of the paths of cdp files to inject into the correlator.
filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- injectQuery(filename, filedir=None, diagnostics=False, **xargs)[source]¶
Inject a Query into the correlator.
See also
initialize
.- Parameters
filename – The query file to inject into the correlator
filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
diagnostics – Enable runtime diagnostic logging in the query
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- sendEventStrings(*eventStrings, **xargs)[source]¶
Send one or more event strings into the correlator.
This method writes a temporary file containing the specified strings.
See the documentation for engine_send for more information.
For example:
self.sendEventStrings('mypackage.Event1()', 'mypackage.Event2("Hello World")')
- Parameters
eventStrings – One or more event strings to be sent to this correlator. May be unicode or UTF-8 byte strings. May include a channel designator.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.channel – The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts.
- send(filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs)[source]¶
Send events from one or more file into the correlator.
See the documentation for engine_send for more information.
- Parameters
filenames – List of the event file paths to send into the correlator
filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.
loop – Number of times to loop through the input file
utf8 – Specifies the file contents are encoded in UTF-8, rather than in the machine’s default encoding.
channel – The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- delete(names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs)[source]¶
Delete named objects from the Event Crrelator.
- Parameters
names – List of names to delete from the correlator
filename – The basename of a file containing a set of names to delete
filedir – The directory containing filename (defaults to testcase input subdirectory)
force – Force deletion of names even if they are in use
kill – Kill name even if it is a running monitor
all – Delete everything in the correlator
utf8 – Specifies the file contents are encoded in UTF-8, rather than in the machine’s default encoding.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- connect(source, channel=None, channels=None, mode=None, **xargs)[source]¶
Connect a correlator to this instance as a source.
- Parameters
source – An instance of the
CorrelatorHelper
class to act as the sourcechannel – The channel to make the connection on
channels – The list of channels to make the connection on
mode – The connection mode - ‘legacy’ or ‘parallel’; parallel uses a connection per channel
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- disconnect(source, channel=None, channels=None, mode=None, **xargs)[source]¶
Disconnect a correlator to this instance as a source correlator.
- Parameters
source – An instance of the
CorrelatorHelper
class acting as the sourcechannel – The channel to be disconnected
channels – The list of channels to be disconnected
mode – The connection mode - ‘legacy’ or ‘parallel’; parallel uses a connection per channel
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- applicationEventLogging(enable=True, **xargs)[source]¶
Enable and disable application event logging.
Provides a wrapper around the engine_management command line tool to enable and disable application event logging. Once enabled, application event logging will log to the correlator log file information specific processing occurrences, e.g. the receipt of events for processing, the triggering of listeners, execution of the garbage collector etc.
- Parameters
enable – Set to True to enable, set to False to disable event logging
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- setApplicationLogFile(filename=None, filedir=None, **xargs)[source]¶
Set the application log file name.
On setting the application log file details, the output of all native log commands within EPL will be logged to the designated log file. This allows separation between the log statements written by the correlator i.e. for status, errors etc, and those generated by the actual application.
- Parameters
filename – The basename or path of the file to write the application log file to
filedir – The directory to write filename to (defaults to testcase output subdirectory)
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- setApplicationLogLevel(verbosity, **xargs)[source]¶
Set the application log level.
- Parameters
verbosity – The verbosity level of the application logging
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- profilingOn(**xargs)[source]¶
Inform the correlator to start collecting profiling statistics.
- Parameters
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- manage(arguments=[], displayName='manage', **xargs)¶
Execute component_management operations against the process.
- Parameters
arguments – The arguments to be passed to component_management
xargs – Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
- profilingOff(**xargs)[source]¶
Inform the correlator to stop collecting profiling statistics.
- Parameters
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- running()¶
Returns True if this has a local process that is running, or False if it has stopped, or was not started by this object.
- shutdown(message='Shutdown requested by test', timeout=60, **args)¶
Requests a clean shutdown of the component.
If it was started by this object, also waits for the process to terminate, and silently ignores requests to shutdown if the process was already stopped.
- waitForComponentUp(timeout=60, **xargs)¶
Block until the the component declares itself to be ready for processing.
- waitForLogGrep(expr, errorExpr=['(ERROR|FATAL|Failed to parse) .*'], ignores=None, mappers=None, **kwargs)¶
Wait until this component’s
logfile
contains the specified regular expression, such as a message indicating that the test has completed.For example:
correlator.waitForLogGrep(r'INFO .* Test completed')
This is a wrapper around
pysys.basetest.BaseTest.waitForGrep
that adds functionality specific to Apama components, such as aborting early (with a clear outcome reason) if an error message is found in the log file or the process dies while waiting (with mappers to convert multi-line stack traces to a self-contained message), and always using the UTF-8 encoding to read the log file.Any of the standard
waitForGrep
keywords such asignores=
,timeout=
may also be pass in as keyword arguments.- Parameters
expr (str) – The regular expression to search for in the log file. Remember to escape characters such as
[]()\
.condition (str) – The condition to be met for the number of lines matching the regular expression; by default we wait until there is at least one occurrence.
timeout (int) – The number of seconds to wait for the regular expression before giving up and aborting the test with
pysys.constants.TIMEDOUT
(unless abortOnError=False in which case execution will continue).ignores (list[str]) – A list of regular expressions used to identify lines in the file which should be ignored when matching both
expr
anderrorExpr
. By default this is taken from the value of theapama.testplugin.ApamaPlugin.defaultLogIgnores
.mappers (List[callable[str]->str]) – A list of filter functions that will be used to pre-process each line of the log. Unless explicitly overridden, mappers for EPL and Java will be used (
apama.testplugin.ApamaPlugin.JoinEPLStackLines
andpysys.mappers.JoinLines.JavaStackTrace
).
- Returns
A list of the matches; see
pysys.basetest.BaseTest.waitForGrep
for details.
New in version 10.11.0.
- profilingReset(**xargs)[source]¶
Inform the correlator to reset it’s collection of profiling statistics.
- Parameters
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- profilingGet(filename, filedir=None, **xargs)[source]¶
Obtain the latest profiling statistics from the correlator.
- Parameters
filename – The basename of the file to write the profiling statistics to
filedir – The directory to write filename to (defaults to testcase output subdirectory)
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- toStringAll(filename, filedir=None, **xargs)[source]¶
Obtain a stringified representation of the current application state from the correlator.
- Parameters
filename – The basename of the file to write the dump of application state to
filedir – The directory to write filename to (defaults to testcase output subdirectory)
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
- flush(timeout=60, count=1, **xargs)[source]¶
Make sure all events have been flushed through the correlator.
Currently implemented by using the flushAllQueues management request. Will initate a cycle where each queue in the correlator is drained, optionally repeated count times. This is useful when you have a multi-context application.
- Parameters
timeout – The amount of time to wait in seconds.
count – The number of times to ensure queues are flushed.
xargs – Optional
pysys.process.user.ProcessUser.startProcess
keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.