Source code for apama.runner

#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights
# Copyright (c) 2013,2015-2016, 2018-2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains the deprecated PySys custom runner that adds support for EPL code coverage reporting. 
"""

from __future__ import print_function
import sys, stat, os, string, logging, socket, copy, random, types

import pysys
from pysys import rootLogger
from pysys.constants import *
from pysys.exceptions import *
from pysys.baserunner import BaseRunner
from pysys.utils.pycompat import *
from pysys.utils.fileutils import deletedir, mkdir, pathexists
import apama.coverage

log = logging.getLogger('pysys.apama.coverage')

[docs]class ApamaRunner(BaseRunner): """ (Deprecated) A custom PySys runner for Apama tests. Supports the ability to generate EPL code coverage reports using ``pysys run -Xeplcoverage``. Coverage settings can be configured by setting the following attributes on this class in code or via -X options: ``eplcoverageargs, eplcoveragesource, eplcoverageinclude, eplcoverageexclude, eplcoveragetitle``. This class is deprecated. Remove the ApamaRunner from your project configuration and instead add an instance of `apama.coverage.EPLCoverageWriter`:: <writer classname="apama.coverage.EPLCoverageWriter"> <property name="destDir" value="__coverage_epl.${outDirName}"/> </writer> """ eplcoverage = False # pre-10.7 name # (importantly we do NOT set the newer -XeplCoverage here, since that would stop us using it to override -XcodeCoverage if desired) def setup(self): BaseRunner.setup(self) self.__eplCoverageFiles = [] def processCoverageData(self, **kwargs): super(ApamaRunner, self).processCoverageData(**kwargs) if not apama.coverage.EPLCoverageWriter._isEPLCoverageEnabled(self): return runneroutput = self.output # keep same path as pre-10.5, even though the new PySys has changed the default runner output dir if os.path.basename(runneroutput) == 'pysys-runner': runneroutput = os.path.dirname(runneroutput) cov = self.project.getProperty('eplCoverageDir', runneroutput+'/eplcoverage') assert cov and cov!='.', cov cov = os.path.normpath(os.path.join(os.path.dirname(self.output), cov)) deletedir(cov) mkdir(cov) if len(self.__eplCoverageFiles) == 0: self.log.info('No tests generated any EPL code coverage files (hint: check your correlators were shutdown cleanly)') return self.log.info('Writing EPL code coverage output to: %s', cov) with openfile(cov+'/coverage_files.txt', 'w', encoding='utf-8') as f: # Writes Generated EPL Coverage file names to coverage_file.txt - always utf-8 for l in self.__eplCoverageFiles: if isinstance(l, binary_type): # we have to guess at the encoding; utf-8 is slightly safer than local/default encoding # since at least 7-bit ascii chars will always work in utf8 l = l.decode('utf-8') print(l, file=f) arguments=[ '--output', cov, '--exclude', '**/Input/**.mon', # test files aren't interesting '--exclude', '**/*.tmp.mon', # test files aren't interesting # exclude all files from Apama monitor dir. '--exclude', self.project.APAMA_HOME+'/monitors/**', # not useful '--exclude', '**/*.qry.mon', ] def tostringlist(s): if not s: return [] # accept either a string or a list if isstring(s): if ',' in s: return [s.strip() for s in s.split(',')] return [s] return s for s in tostringlist(getattr(self, 'eplcoverageinclude', '')): arguments.extend(['--include', s]) for s in tostringlist(getattr(self, 'eplcoverageexclude', '')): arguments.extend(['--exclude', s]) for s in tostringlist(getattr(self, 'eplcoveragesource', '')): arguments.extend(['--source', s]) for s in tostringlist(getattr(self, 'eplcoverageargs', '')): arguments.extend([s]) if hasattr(self, 'eplcoveragetitle'): arguments.extend(['--title', getattr(self, 'eplcoveragetitle', '')]) arguments.append('@%s/coverage_files.txt'%cov) self.startProcess(PROJECT.APAMA_HOME+'/bin/epl_coverage', arguments=arguments, state=FOREGROUND, environs=dict(os.environ), # need this so we have APAMA_JRE/JAVA_HOME etc available (on Windows, or from Unix core edition) displayName='epl_coverage (with %d coverage files)'%(len(self.__eplCoverageFiles)), stdouterr=cov+'/epl_coverage', workingDir=cov ) self.publishArtifact(cov, 'EPLCoverageDir') def isPurgableFile(self, path): # override return not path.endswith('.eplcoverage') and BaseRunner.isPurgableFile(self, path) def testComplete(self, testObj, dir): if apama.coverage.EPLCoverageWriter._isEPLCoverageEnabled(self) and os.path.exists(dir): for p in os.listdir(dir): if p.endswith('.eplcoverage'): self.__eplCoverageFiles.append(dir+'/'+p) BaseRunner.testComplete(self, testObj, dir)