Source code for apama.runner

#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights
# Copyright (c) 2013,2015-2016, 2018 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains PySys custom runner that adds support for EPL code coverage reporting. 
"""

from __future__ import print_function
import sys, stat, os, string, logging, socket, copy, random, types

from pysys import rootLogger
from pysys.constants import *
from pysys.exceptions import *
from pysys.baserunner import BaseRunner
from pysys.process.helper import ProcessWrapper
from apama.correlator import CorrelatorHelper
from pysys.utils.pycompat import *

[docs]class ApamaRunner(BaseRunner): """ A custom PySys runner for Apama tests. Supports the ability to generate EPL code coverage reports. Coverage settings can be configured by setting the following attributes on this class: eplcoverageargs, eplcoveragesource, eplcoverageinclude, eplcoverageexclude, eplcoveragetitle. """
[docs] def __init__(self, record, purge, cycle, mode, threads, outsubdir, descriptors, xargs): BaseRunner.__init__(self, record, purge, cycle, mode, threads, outsubdir, descriptors, xargs) self.output = os.path.join(self.output, self.outsubdir) self.__eplCoverageFiles = [] def generateEplCoverage(): cov = os.path.normpath(self.output+'/eplcoverage') if not os.path.exists(cov): os.makedirs(cov) self.purgeDirectory(cov) self.log.info('Writing EPL code coverage output to: %s', cov) with openfile(cov+'/coverage_files.txt', 'w', encoding='utf-8') as f: # Writes Generated EPL Coverage file names to coverage_file.txt - always utf-8 for l in self.__eplCoverageFiles: if isinstance(l, binary_type): # we have to guess at the encoding; utf-8 is slightly safer than local/default encoding # since at least 7-bit ascii chars will always work in utf8 l = l.decode('utf-8') print(l, file=f) arguments=[ '--output', cov, '--exclude', '**/Input/**.mon', # test files aren't interesting # exclude all files from Apama monitor dir. '--exclude', PROJECT.APAMA_HOME+'/monitors/**', # not useful '--exclude', '**/*.qry.mon', '--exclude', '**/*.sdf.mon'] def tostringlist(s): if not s: return [] # accept either a string or a list if isstring(s): if ',' in s: return [s.strip() for s in s.split(',')] return [s] return s for s in tostringlist(getattr(self, 'eplcoverageinclude', '')): arguments.extend(['--include', s]) for s in tostringlist(getattr(self, 'eplcoverageexclude', '')): arguments.extend(['--exclude', s]) for s in tostringlist(getattr(self, 'eplcoveragesource', '')): arguments.extend(['--source', s]) for s in tostringlist(getattr(self, 'eplcoverageargs', '')): arguments.extend([s]) if hasattr(self, 'eplcoveragetitle'): arguments.extend(['--title', getattr(self, 'eplcoveragetitle', '')]) arguments.append('@%s/coverage_files.txt'%cov) self.startProcess(PROJECT.APAMA_HOME+'/bin/epl_coverage', arguments=arguments, state=FOREGROUND, displayName='epl_coverage (with %d coverage files)'%(len(self.__eplCoverageFiles)), stdout=cov+'/epl_coverage.out', stderr=cov+'/epl_coverage.err', ) if getattr(self, 'eplcoverage','').lower()=='true': self.addCleanupFunction(generateEplCoverage)
[docs] def isPurgableFile(self, path): # override return not path.endswith('.eplcoverage') and BaseRunner.isPurgableFile(self, path)
[docs] def purgeDirectory(self, dir, delTop=False): """Recursively purge a directory removing all files and sub-directories. @param dir: The top level directory to be purged @param delTop: Indicates if the top level directory should also be deleted """ try: for file in os.listdir(dir): path = os.path.join(dir, file) if PLATFORM in ['sunos', 'linux']: mode = os.lstat(path)[stat.ST_MODE] else: mode = os.stat(path)[stat.ST_MODE] if stat.S_ISLNK(mode): os.unlink(path) if stat.S_ISREG(mode): os.remove(path) elif stat.S_ISDIR(mode): self.purgeDirectory(path, delTop=True) if delTop: os.rmdir(dir) except OSError as ex: self.log.warning("Caught OSError in purgeDirectory():") self.log.warning(ex) self.log.warning("Directory %s may not be completely purged" % dir)
[docs] def testComplete(self, testObj, dir): if getattr(self, 'eplcoverage','').lower()=='true' and os.path.exists(dir): for p in os.listdir(dir): if p.endswith('.eplcoverage'): self.__eplCoverageFiles.append(dir+'/'+p) BaseRunner.testComplete(self, testObj, dir)