apama.correlator

Contains PySys extensions for starting and interacting with correlator processes.

CorrelatorHelper

class apama.correlator.CorrelatorHelper(parent, port=None, host=None, name='correlator', **startArgs)[source]

Bases: apama.common.ApamaServerProcess

Class for an instance of an Apama correlator.

The easiest way to create an instance of this class and use it to start a new correlator is with the apama.testplugin.ApamaPlugin.startCorrelator() method (available via the self.apama helper field), for example:

mycorrelator = self.apama.startCorrelator('myCorrelator', config=[self.input+'/myconfig.yaml'])
mycorrelator.injectEPL(
        ['${APAMA_HOME}/monitors/'+f for f in ['ManagementImpl.mon' ,'Management.mon', ] ]+
        ['${appHome}/monitors/'+f for f in ['MyApp1.mon', 'MyApp2.mon', ] ]+
        )
Variables
  • ~.parent – Reference to the PySys pysys.basetest.BaseTest testcase (or pysys.process.user.ProcessUser) instantiating this instance.

  • ~.port (int) – Port used for starting and interaction with the process. If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the process, and performing all operations against it.

  • ~.host (str) – Hostname for interaction with a remote process. The host parameter is only used to perform operations against a remote process started externally to the PySys framework - the class does not support the starting of a remote process.

  • ~.name (str) – A display name for this process (default is “correlator”), also used for the default stdout/err filenames.

  • ~.startArgs (dict(str,str)) – Default values can be provided here for any keyword arguments that are supported by start().

classpath = None

Holds the Java classpath used when starting a correlator with JVM.

injectJMON

Alias for injectJava.

injectMon

Alias for injectEPL.

injectMonitorscript

Alias for injectEPL.

addToClassPath(path)[source]

Add the supplied path to the Java classpath variable for starting this instance. Is is usually better to use YAML configuration for setting the classpath.

addToPath(path)[source]

Add the supplied dynamic library directory path to the PATH (on Windows) or LD_LIBRARY_PATH (on Unix) environment variable for starting this instance.

You can also add to the path when starting the correlator using the ‘’environ’’ argument:

environ={LIBRARY_PATH_ENV_VAR: os.getenv(LIBRARY_PATH_ENV_VAR)+os.pathsep+self.output+'/mylibpath'}
start(logfile=None, verbosity=None, java=None, Xclock=None, environ=None, inputLog=None, waitForServerUp=None, config=None, configPropertyOverrides=None, license=None, **xargs)[source]

Start the correlator.

Note that default values for any of this method’s parameters can be passed to the constructor when this object is instantiated.

Parameters
  • arguments (list[str]) – a list of additional command line arguments to pass to the correlator.

  • logfile (str) – Name of the correlator log file (if used, set this to something similar to the display “name” passed to the constructor). Usually it is better to leave his unset and use the name of the constructor which sets both logfile and the stdouterr prefix to the same name.

  • java (bool) – If True then the correlator will be started with support for injecting Java applications.

  • Xclock (bool) – If True then the correlator will be started in externally clocked mode, meaning that time is controlled by sending in &TIME(...) ticks from a .evt file rather than from the system clock.

  • environ (dict[str,str]) –

    Map of environment variables to add or override in addition to the defaults for this process.

    For example, to add a directory containing dynamic library plug-ins to the library path variable of the current platform:

    environ={LIBRARY_PATH_ENV_VAR: os.getenv(LIBRARY_PATH_ENV_VAR)+os.pathsep+self.output+'/mylibpath'}
    

  • inputLog (str) – Relative or absolute path of file to write input log to, containing all events, EPL and other inputs sent to the correlator. The format of the input log may change without notice so should not be replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the input log can also be used to pass information to customer support in the event of a problem.

  • waitForServerUp (bool) – Set to False to disable automatically waiting until the component is ready.

  • config (list[str]|str) – path or list of paths to a initialization or connectivity configuration .yaml file or directory containing them

  • configPropertyOverrides – a dictionary of key,value pairs to be passed as Apama (not Java) configuration property overrides using the -D flag.

  • license – a path to the correlator license file. This can also be provided using the project property apamaCorrelatorLicense. If not specified, the default location is checked and if not present there then the correlator runs without a license.

  • verbosity (str) – The default verbosity level of all correlator logging. Setting this to DEBUG will give more information, but usually the log is hard to read, so instead use a yaml file to set the level just for the packages of interest.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

injectTestEventLogger(channels)[source]

Injects a special monitor that listens for all input/output events on the specified channel(s) and writes them to the correlator log in a format that can later be read into a Python dictionary using the apama.testplugin.ApamaPlugin.extractEventLoggerOutput() method.

This is a really convenient way to access and manipulate Apama event data within your Python test validation logic, typically using pysys.basetest.BaseTest.assertThat. This approach is a lot more reliable than using regular expressions or “.evt” file diffs.

The format used for these log lines is internal and subject to change, so should only be used by the extractEventLoggerOutput method.

Note that there some event types cannot be handled with this logger (mostly unusual cases such as cyclic event types, and dictionary fields with complex key types). In the unlikely event that you get an error for the event type you need to test, you will need to use a different approach for validating your events.

The events are logged in the EPL package apama.test and the monitor name is TestEventLogger, so you can specify that package in your logging configuration if you wish to redirect these lines to a file other than the main correlator log.

Only a single instance of this can be active in a correlator at a time.

Parameters

channels (list[str]) – The correlator channels the event logging monitor will subscribe to. Add an empty string to this list to include events sent to any public context.

receive(filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False, logChannels=False, **xargs)[source]

Start a process that will receive events sent from the correlator.

See also injectTestEventLogger() which provides an alternative way to record the events sent out of the correlator that is easier to process from your Python test case.

Parameters
  • filename – The basename of the file to write events received from the correlator to

  • filedir – The directory to write filename to (defaults to testcase output subdirectory)

  • channels – List of channel names to subscribe to

  • logChannels – Print the channel each event came from in the output

  • suppressBatch – Do not include BATCH timestamps in the output

  • zeroAtFirstBatch – Measure BATCH timestamps from when the first batch arrived

  • utf8 – Write output in UTF8

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

Returns

The process for the receiver.

watch(filename=None, filedir=None, raw=False, interval=None, **xargs)[source]

Obtain runtime operational statistics from the correlator.

By default this runs as a BACKGROUND process. The process is returned by the method call.

Parameters
  • filename – The basename of the file to write the runtime operational status to

  • filedir – The directory to write filename to (defaults to testcase output subdirectory)

  • raw – Obtain csv format data when logging to file

  • interval – The polling interval (seconds) between logging to file

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

Note

When outputing data in the raw (csv) format, the column identifiers and their positions are defined by WATCH_COLUMNS. Use CorrelatorHelper.WATCH_COLUMNS to look up the column position for a given identifier:

inspect(filename='inspect.txt', filedir=None, raw=False, **xargs)[source]

Obtain information about what application(s) have been injected into the correlator and what listeners are in existence.

This runs as a FOREGROUND process.

Parameters
  • filename – The basename of the file to write the information to, e.g. inspect.txt

  • filedir – The directory to write filename to (defaults to testcase output subdirectory)

  • raw – Use parser-friendly output format

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

initialize(path, correlatorName=None, properties=None, include=None, exclude=None, **xargs)[source]

Initialize the correlator by injecting all the files making up the project, typically based on a Designer launch configuration .deploy file.

This is usually the simplest way to inject all the files from an application into the correlator. Alternative approaches are to call the injectEPL and related methods individually for each file, or to specify the files in the “initialization” section of a yaml file passed into the correlator start call using the “config” argument.

Queries and Digital Event Services .mon files will be generated automatically as part of injection, but any Java jar files must be compiled manually before invoking this method.

Parameters
  • path – Path of a .deploy file from Designer (recommended), a directory, or a text file listing the files to be injected. Must be an absolute path, or relative to the testcase output dir.

  • correlatorName – The name of the correlator as specified in the launch configuration .deploy file, e.g “defaultCorrelator”. If not specified, the name of this pysys correlator will be used.

  • properties – Optional path to a .properties file specifying ${var} placeholders to be used for resolving the paths of any files outside the project directory. Absolute path or relative to output dir.

  • include – a comma-separated string specifying which of the project files found by the tool should be injected, e.g. **/foo/Bar*.evt,**.mon. If not specified, all files will be included (unless specifically excluded)

  • exclude – a comma-separated string specifying which of the project files found by the tool should NOT be injected, e.g. **/foo/Bar*.evt.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

injectEPL(filenames=[], filedir=None, utf8=False, **xargs)[source]

Inject EPL *.mon files into the correlator.

Note that it is much more efficient to inject all files in a single call then to make many separate calls. For example:

self.injectEPL(
        ['${APAMA_HOME}/monitors/'+f for f in ['ManagementImpl.mon' ,'Management.mon', ] ]+
        ['${appHome}/monitors/'+f for f in ['MyApp1.mon', 'MyApp2.mon', ] ]+
        ...
)

See also initialize, which can be used to inject an entire Apama project.

Parameters
  • filenames – List of the EPL files to inject into the correlator, either absolute paths or relative to the filedir. Any ${...} project properties will be expanded. If the same path is listed more than once in a single call to this method, the later ones will be ignored to prevent duplicate injection errors (as of version 10.15.3).

  • filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.

  • utf8 – Specifies the file contents are encoded in UTF-8, rather than in the machine’s default encoding.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

injectJava(filename, filedir=None, **xargs)[source]

Inject a Java plug-in or application into the correlator.

See also initialize.

Parameters
  • filename – The path of the jar file to inject into the correlator.

  • filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

injectCDP(filenames=[], filedir=None, **xargs)[source]

Inject correlator deployment package into the correlator.

See also initialize.

Parameters
  • filenames – List of the paths of cdp files to inject into the correlator.

  • filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

injectQuery(filename, filedir=None, diagnostics=False, **xargs)[source]

Inject a Query into the correlator.

See also initialize.

Parameters
  • filename – The query file to inject into the correlator

  • filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.

  • diagnostics – Enable runtime diagnostic logging in the query

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

sendEventStrings(*eventStrings, **xargs)[source]

Send one or more event strings into the correlator.

This method writes a temporary file containing the specified strings.

See the documentation for engine_send for more information.

For example:

self.sendEventStrings('mypackage.Event1()', 'mypackage.Event2("Hello World")')
Parameters
  • eventStrings – One or more event strings to be sent to this correlator. May be unicode or UTF-8 byte strings. May include a channel designator.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

  • channel – The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts.

send(filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs)[source]

Send events from one or more file into the correlator.

See the documentation for engine_send for more information.

Parameters
  • filenames – List of the event file paths to send into the correlator

  • filedir – By default any relative paths are assumed to be in the test input directory, but this can be overridden with this parameter.

  • loop – Number of times to loop through the input file

  • utf8 – Specifies the file contents are encoded in UTF-8, rather than in the machine’s default encoding.

  • channel – The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

delete(names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs)[source]

Delete named objects from the Event Crrelator.

Parameters
  • names – List of names to delete from the correlator

  • filename – The basename of a file containing a set of names to delete

  • filedir – The directory containing filename (defaults to testcase input subdirectory)

  • force – Force deletion of names even if they are in use

  • kill – Kill name even if it is a running monitor

  • all – Delete everything in the correlator

  • utf8 – Specifies the file contents are encoded in UTF-8, rather than in the machine’s default encoding.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

connect(source, channel=None, channels=None, mode=None, **xargs)[source]

Connect a correlator to this instance as a source.

Parameters
  • source – An instance of the CorrelatorHelper class to act as the source

  • channel – The channel to make the connection on

  • channels – The list of channels to make the connection on

  • mode – The connection mode - ‘legacy’ or ‘parallel’; parallel uses a connection per channel

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

disconnect(source, channel=None, channels=None, mode=None, **xargs)[source]

Disconnect a correlator to this instance as a source correlator.

Parameters
  • source – An instance of the CorrelatorHelper class acting as the source

  • channel – The channel to be disconnected

  • channels – The list of channels to be disconnected

  • mode – The connection mode - ‘legacy’ or ‘parallel’; parallel uses a connection per channel

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

applicationEventLogging(enable=True, **xargs)[source]

Enable and disable application event logging.

Provides a wrapper around the engine_management command line tool to enable and disable application event logging. Once enabled, application event logging will log to the correlator log file information specific processing occurrences, e.g. the receipt of events for processing, the triggering of listeners, execution of the garbage collector etc.

Parameters
  • enable – Set to True to enable, set to False to disable event logging

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

setApplicationLogFile(filename=None, filedir=None, **xargs)[source]

Set the application log file name.

On setting the application log file details, the output of all native log commands within EPL will be logged to the designated log file. This allows separation between the log statements written by the correlator i.e. for status, errors etc, and those generated by the actual application.

Parameters
  • filename – The basename or path of the file to write the application log file to

  • filedir – The directory to write filename to (defaults to testcase output subdirectory)

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

setApplicationLogLevel(verbosity, **xargs)[source]

Set the application log level.

Parameters
profilingOn(**xargs)[source]

Inform the correlator to start collecting profiling statistics.

Parameters

xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

manage(arguments=[], displayName='manage', **xargs)

Execute component_management operations against the process.

Parameters
  • arguments – The arguments to be passed to component_management

  • xargs – Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir

profilingOff(**xargs)[source]

Inform the correlator to stop collecting profiling statistics.

Parameters

xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

running()

Returns True if this has a local process that is running, or False if it has stopped, or was not started by this object.

shutdown(message='Shutdown requested by test', timeout=60, **args)

Requests a clean shutdown of the component.

If it was started by this object, also waits for the process to terminate, and silently ignores requests to shutdown if the process was already stopped.

waitForComponentUp(timeout=60, **xargs)

Block until the the component declares itself to be ready for processing.

waitForLogGrep(expr, errorExpr=['(ERROR|FATAL|Failed to parse) .*'], ignores=None, mappers=None, **kwargs)

Wait until this component’s logfile contains the specified regular expression, such as a message indicating that the test has completed.

For example:

correlator.waitForLogGrep(r'INFO .* Test completed')

This is a wrapper around pysys.basetest.BaseTest.waitForGrep that adds functionality specific to Apama components, such as aborting early (with a clear outcome reason) if an error message is found in the log file or the process dies while waiting (with mappers to convert multi-line stack traces to a self-contained message), and always using the UTF-8 encoding to read the log file.

Any of the standard waitForGrep keywords such as ignores=, timeout= may also be pass in as keyword arguments.

Parameters
  • expr (str) – The regular expression to search for in the log file. Remember to escape characters such as []()\.

  • condition (str) – The condition to be met for the number of lines matching the regular expression; by default we wait until there is at least one occurrence.

  • timeout (int) – The number of seconds to wait for the regular expression before giving up and aborting the test with pysys.constants.TIMEDOUT (unless abortOnError=False in which case execution will continue).

  • ignores (list[str]) – A list of regular expressions used to identify lines in the file which should be ignored when matching both expr and errorExpr. By default this is taken from the value of the apama.testplugin.ApamaPlugin.defaultLogIgnores.

  • mappers (List[callable[str]->str]) – A list of filter functions that will be used to pre-process each line of the log. Unless explicitly overridden, mappers for EPL and Java will be used ( apama.testplugin.ApamaPlugin.JoinEPLStackLines and pysys.mappers.JoinLines.JavaStackTrace).

Returns

A list of the matches; see pysys.basetest.BaseTest.waitForGrep for details.

New in version 10.11.0.

profilingReset(**xargs)[source]

Inform the correlator to reset it’s collection of profiling statistics.

Parameters

xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

profilingGet(filename, filedir=None, **xargs)[source]

Obtain the latest profiling statistics from the correlator.

Parameters
  • filename – The basename of the file to write the profiling statistics to

  • filedir – The directory to write filename to (defaults to testcase output subdirectory)

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

toStringAll(filename, filedir=None, **xargs)[source]

Obtain a stringified representation of the current application state from the correlator.

Parameters
  • filename – The basename of the file to write the dump of application state to

  • filedir – The directory to write filename to (defaults to testcase output subdirectory)

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

flush(timeout=60, count=1, **xargs)[source]

Make sure all events have been flushed through the correlator.

Currently implemented by using the flushAllQueues management request. Will initate a cycle where each queue in the correlator is drained, optionally repeated count times. This is useful when you have a multi-context application.

Parameters
  • timeout – The amount of time to wait in seconds.

  • count – The number of times to ensure queues are flushed.

  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.

hasLicence()[source]

Does this correlator instance have access to a licence file?