Source code for apama.testplugin

#!/usr/bin/env python3
# Copyright (c) 2020-2021 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains a PySys test plugin that provides a simple way to access Apama functionality from your testcases using ``self.apama.XXX()``. 
"""

import sys, os, logging, types
import json

import pysys
from pysys.constants import *

import apama.correlator
import apama.iaf
import apama.build
import apama.project

[docs]class ApamaPlugin(object): """PySys test plugin class for Apama, which provides access to Apama functionality from individual testcases. To use this plugin make sure your project configuration contains:: <test-plugin classname="apama.testplugin.ApamaPlugin" alias="apama"/> The members of this class will then be available to any test in the project using the "apama" alias i.e. ``self.apama.XXX``. For example to call `startCorrelator`:: correlator = self.apama.startCorrelator('testCorrelator', ...) .. versionadded:: Apama 10.7.0 """ def setup(self, testObj): self.owner = self.testObj = testObj self.log = logging.getLogger('pysys.apama.ApamaPlugin') self.__verboseAssertionsLog = logging.getLogger('pysys.assertions.apama.ApamaPlugin') # nb: this needs to always create a new list, as we don't want people accidentally mutating the project property self.defaultLogIgnores = testObj.project.getProperty('apamaDefaultLogIgnores', [])+[ # Tests running in parallel can get de-scheduled for some time, so ignore this: ' WARN .*Took [,.0-9]*s for an invocation of .*', ]
[docs] def startCorrelator(self, name='testCorrelator', arguments=[], java=None, Xclock=False, config=None, **xargs): """Start a new correlator process on a dynamically allocated port, and wait until it is running. :param str name: A logical name for this correlator, which will be used for the log filename, stdouterr and the component name. :param bool java: If True then the correlator will be started with support for injecting Java applications. :param bool Xclock: If True then the correlator will be started in externally clocked mode, meaning that time is controlled by sending in ``&TIME(...)`` ticks from a .evt file rather than from the system clock. :param list[str]|str config: path or list of paths to a initialization or connectivity configuration .yaml file or directory containing them. :param list[str] arguments: Additional arguments to be passed to the correlator. :param xargs: Additional keyword arguments that will be passed to `apama.correlator.CorrelatorHelper.start()`. :return: An instance of `apama.correlator.CorrelatorHelper` that you can use to interact with your application. """ c = apama.correlator.CorrelatorHelper(self.testObj, name=name) xargs = xargs.copy() xargs.setdefault('abortOnError', True) xargs.setdefault('ignoreExitStatus', False) xargs.setdefault('stdouterr', name) xargs.setdefault('logfile', name+'.log') c.start(arguments=arguments, java=java, Xclock=Xclock, config=config, **xargs) return c
[docs] def startIAF(self, name, configname, **xargs): """Start a new IAF (adapter) process on a dynamically allocate port, and wait until it is running. :param str name: A logical name for this IAF process, which will be used for the log filename, stdouterr and the component name. :param str configname: The IAF configuration file or template name :param xargs: Additional keyword arguments that will be passed to `apama.iaf.IAFHelper.start()`. :return: An instance of `apama.iaf.IAFHelper`. """ i = apama.iaf.IAFHelper(self.testObj, name=name) xargs = xargs.copy() xargs.setdefault('abortOnError', True) xargs.setdefault('ignoreExitStatus', False) xargs.setdefault('stdouterr', name) xargs.setdefault('logfile', name+'.log') i.start(configname=configname, **xargs) return i
TEST_EVENT_LOGGER_REGEX = '^[-0-9]* [0-9:.]* INFO .[0-9]*. - apama.test.TestEventLogger .[0-9]*. -- Got test event: (.*)' """ The regular expression that identifies each event written to the correlator log by `apama.correlator.CorrelatorHelper.injectTestEventLogger()` and extracted using `extractEventLoggerOutput()`. """
[docs] def extractEventLoggerOutput(self, logFile): """ Uses log messages generated by `apama.correlator.CorrelatorHelper.injectTestEventLogger()` to extract a list of events from the specified correlator log file in a form that's easy to manipulate from Python in your test's validate() method. Top-level events are each represented as a dictionary with an item for each event field, and a special key named ``.eventType`` whose value identifies the event type. Note that the order of the event fields is not preserved, however all dictionaries in the returned value will be sorted alphabetically by key to ensure consistent results. This method is often used with `pysys.basetest.BaseTest.assertThat`, for example:: sensor1_temps = [ # Extract only the field value(s) we care about (allows us to ignore unimportant information, timestamps, etc): (evt['temperature']) for evt in self.apama.extractEventLoggerOutput('testCorrelator.log') # Filter to include the desired subset of events: if evt['.eventType']=='apamax.myapp.Alert' and evt['sensorId']=='TempSensor001' ] self.assertThat('sensor1_temps == expected', sensor1_temps=sensor1_temps, expected=[ 111.0, 120, 145.2, ]) For debugging purposes, a readable multi-line JSON representation of the events is logged when run using:: pysys run -v assertions.apama=debug :param str logFile: The path of the correlator log file containing the logged events. :return: A list[dict[str,obj]] where each item is a dictionary representing the fields of an Apama event. """ # do a double load since otherwise the dictionaries are sorted randomly which is a really bad idea in testcases events = [json.loads(json.dumps(json.loads(evt), sort_keys=True, allow_nan=True)) for evt in self.testObj.grepAll(logFile, self.TEST_EVENT_LOGGER_REGEX, encoding='utf-8')] if self.__verboseAssertionsLog.isEnabledFor(logging.DEBUG): self.__verboseAssertionsLog.debug('extractEventLoggerOutput from %s: \n%s', logFile, json.dumps(events, ensure_ascii=False, allow_nan=True, indent=' ')) return events
[docs] def projectHelper(self, projectName, **kwargs): """Create a new instance of the `apama.project.ProjectHelper` class that can be used for creating, manipulating and deploying Apama project directories. :param str projectName: The project directory. :param kwargs: Additional arguments to be passed to the `apama.project.ProjectHelper` constructor. :return: An instance of `apama.project.ProjectHelper`. """ return apama.project.projectHelper(self.testObj, name=projectName, **kwargs)
[docs] def antBuild(self, buildfile, **kwargs): """ Run an ant build file with Apama environment properties set, typically to generate a project artifact such as a Java plugin or adapter. Runs in a 'build' subdirectory of the parent's output directory. Be careful to ensure that the ant build file generates its output under its working directory, or under an explicitly specified directory that is located inside the test output directory. :param str buildfile: absolute path to the ant build.xml to run :param kwargs: Additional keyword arguments; see `apama.build.antBuild()` for details. """ return apama.build.antBuild(self.testObj, buildfile=buildfile, **kwargs)
[docs] @staticmethod def JoinEPLStackLines(): """ Mapper that joins the lines of an EPL stack dump from a correlator log file into a single line, for easier grepping and more self-contained test outcome failure reasons. See `pysys.mappers.JoinLines` for similar mappers for other languages. This mapper also converts ``[thread_identifier]`` to the placeholder ``...``. Here is an example of using this mapper for checking no errors were logged:: self.assertGrep('testCorrelator.log', '(ERROR|FATAL|Failed to parse) .*', contains=False, mappers=[self.apama.JoinEPLStackLines(), pysys.mappers.JoinLines.JavaStackTrace()]) .. versionadded:: 10.11.0 """ def combiner(lines): # remove the thread id firstline = re.sub(r' \[[^\\]+\] - ', ' ... - ', lines[0], count=1) return pysys.mappers.JoinLines.defaultCombiner([firstline]+ [l[l.find(' - ')+3:].strip() for l in lines[1:]]) return pysys.mappers.JoinLines( startAt=' (ERROR|FATAL) ', # Stop when the indenting stops continueWhile=r'(.* Stack dump:|\] +- \t)', combiner=combiner, )
defaultLogIgnores = [] """ A list of regular expressions indicating lines to be ignored when checking Apama log files for errors and warnings. For example to use this with `~pysys.basetest.BaseTest.assertGrep`:: self.assertGrep('testCorrelator.log', 'WARN .*', contains=False, ignores=self.apama.defaultLogIgnores+['An extra regex to ignore in this testcase']) Project-specific ignores can be added to this list by setting the ``apamaDefaultLogIgnores`` project property to a value delimited by either commas or newlines. .. versionadded:: 10.11.0 """