Source code for apama.runner

#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights
# Copyright (c) 2013,2015-2016, 2018-2021 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains the deprecated PySys custom runner that adds support for EPL code coverage reporting. 
"""

import sys, stat, os, string, logging, socket, copy, random, types
import io

import pysys
from pysys import rootLogger
from pysys.constants import *
from pysys.exceptions import *
from pysys.baserunner import BaseRunner
from pysys.utils.fileutils import deletedir, mkdir, pathexists
import apama.coverage

log = logging.getLogger('pysys.apama.coverage')

[docs]class ApamaRunner(BaseRunner): """ (Deprecated) A custom PySys runner for Apama tests. Supports the ability to generate EPL code coverage reports using ``pysys run -Xeplcoverage``. Coverage settings can be configured by setting the following attributes on this class in code or via -X options: ``eplcoverageargs, eplcoveragesource, eplcoverageinclude, eplcoverageexclude, eplcoveragetitle``. This class is deprecated. Remove the ApamaRunner from your project configuration and instead add an instance of `apama.coverage.EPLCoverageWriter`:: <writer classname="apama.coverage.EPLCoverageWriter"> <property name="destDir" value="__coverage_epl.${outDirName}"/> </writer> """ eplcoverage = False # pre-10.7 name # (importantly we do NOT set the newer -XeplCoverage here, since that would stop us using it to override -XcodeCoverage if desired) def setup(self): BaseRunner.setup(self) self.__eplCoverageFiles = [] def processCoverageData(self, **kwargs): super(ApamaRunner, self).processCoverageData(**kwargs) if not apama.coverage.EPLCoverageWriter._isEPLCoverageEnabled(self): return runneroutput = self.output # keep same path as pre-10.5, even though the new PySys has changed the default runner output dir if os.path.basename(runneroutput) == 'pysys-runner': runneroutput = os.path.dirname(runneroutput) cov = self.project.getProperty('eplCoverageDir', runneroutput+'/eplcoverage') assert cov and cov!='.', cov cov = os.path.normpath(os.path.join(os.path.dirname(self.output), cov)) deletedir(cov) mkdir(cov) if len(self.__eplCoverageFiles) == 0: self.log.info('No tests generated any EPL code coverage files (hint: check your correlators were shutdown cleanly)') return self.log.info('Writing EPL code coverage output to: %s', cov) with io.open(cov+'/coverage_files.txt', 'w', encoding='utf-8') as f: # Writes Generated EPL Coverage file names to coverage_file.txt - always utf-8 for l in self.__eplCoverageFiles: if isinstance(l, bytes): # we have to guess at the encoding; utf-8 is slightly safer than local/default encoding # since at least 7-bit ascii chars will always work in utf8 l = l.decode('utf-8') print(l, file=f) arguments=[ '--output', cov, '--exclude', '**/Input/**.mon', # test files aren't interesting '--exclude', '**/*.tmp.mon', # test files aren't interesting # exclude all files from Apama monitor dir. '--exclude', self.project.APAMA_HOME+'/monitors/**', # not useful '--exclude', '**/*.qry.mon', ] def tostringlist(s): if not s: return [] # accept either a string or a list if isinstance(s, str): if ',' in s: return [s.strip() for s in s.split(',')] return [s] return s for s in tostringlist(getattr(self, 'eplcoverageinclude', '')): arguments.extend(['--include', s]) for s in tostringlist(getattr(self, 'eplcoverageexclude', '')): arguments.extend(['--exclude', s]) for s in tostringlist(getattr(self, 'eplcoveragesource', '')): arguments.extend(['--source', s]) for s in tostringlist(getattr(self, 'eplcoverageargs', '')): arguments.extend([s]) if hasattr(self, 'eplcoveragetitle'): arguments.extend(['--title', getattr(self, 'eplcoveragetitle', '')]) arguments.append('@%s/coverage_files.txt'%cov) self.startProcess(PROJECT.APAMA_HOME+'/bin/epl_coverage', arguments=arguments, state=FOREGROUND, environs=dict(os.environ), # need this so we have APAMA_JRE/JAVA_HOME etc available (on Windows, or from Unix core edition) displayName='epl_coverage (with %d coverage files)'%(len(self.__eplCoverageFiles)), stdouterr=cov+'/epl_coverage', workingDir=cov ) self.publishArtifact(cov, 'EPLCoverageDir') def isPurgableFile(self, path): # override return not path.endswith('.eplcoverage') and BaseRunner.isPurgableFile(self, path) def testComplete(self, testObj, dir): if apama.coverage.EPLCoverageWriter._isEPLCoverageEnabled(self) and os.path.exists(dir): for p in os.listdir(dir): if p.endswith('.eplcoverage'): self.__eplCoverageFiles.append(dir+'/'+p) BaseRunner.testComplete(self, testObj, dir)