#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2020 M.B. Grieve
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
Contains `pysys.process.user.ProcessUser` which supports using processes from PySys, and provides the
shared functionality of subclasses `pysys.basetest.BaseTest` and `pysys.baserunner.BaseRunner`.
"""
import time, collections, inspect, locale, fnmatch, sys
import threading
import shutil
from pysys import log, process_lock
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.filegrep import getmatches
from pysys.utils.logutils import BaseLogFormatter
from pysys.xml.project import Project
from pysys.process.helper import ProcessWrapper
from pysys.utils.allocport import TCPPortOwner
from pysys.utils.fileutils import mkdir, deletedir, pathexists, toLongPathSafe
from pysys.utils.pycompat import *
from pysys.utils.stringutils import compareVersions
import pysys.internal.safe_eval
STDOUTERR_TUPLE = collections.namedtuple('stdouterr', ['stdout', 'stderr'])
"""
Returned by `ProcessUser.allocateUniqueStdOutErr` to hold a pair of ``(stdout,stderr)`` names.
"""
[docs]class ProcessUser(object):
"""
ProcessUser provides support for safely using processes in PySys, including starting processes (with an
appropriate output directory), waiting for them to do what they're supposed to do, and finally ensuring all
processes and ports are cleaned up when each test (or the overall runner) terminates.
As the common base class of both `pysys.basetest.BaseTest` and `pysys.baserunner.BaseRunner`, ProcessUser also
holds shared functionality such as dynamic port allocation, copying files, creating directories etc, and
keeps track of the "outcome" generated by some methods (although the outcome value is only used when
it is subclassed by BaseTest).
Each ProcessUser instance is responsible for managing the lifetime of the processes started using its
`startProcess` method, and ensuring they are all terminated when the `cleanup` method is invoked. Additional
functions to be executed during cleanup can be registered using `addCleanupFunction`, for example to delete large
output directories, or terminate non-process resources such as docker containers or remote servers.
Apart from the `addOutcome` method this class is not thread-safe, so if
you need to access it from multiple threads be sure to add your own locking
around use of its fields and methods, including any cleanup functions.
:ivar str ~.input: Full path to the directory containing input files (e.g. ``testdir/Input``)
:ivar str ~.output: Full path to the directory that output files should be written to (e.g. ``testdir/Output/<platformname>``)
:ivar logging.Logger ~.log: The Python ``Logger`` instance that should be used to record progress and status information.
:ivar pysys.xml.project.Project ~.project: A reference to the singleton project instance containing the
configuration of this PySys test project as defined by ``pysysproject.xml``.
The project can be used to access information such as the project properties which are shared across all tests
(e.g. for hosts and credentials).
:ivar bool ~.disableCoverage: Set to True to disable all code coverage collection for processes
started from this instance. For example, to disable coverage in tests tagged with the
'performance' group you could use a line like this in your BaseTest::
if 'performance' in self.descriptor.groups: self.disableCoverage = True
The built-in Python code coverage functionality in L{startPython} checks this
flag. It is recommended that any other languages supporting code coverage
also check the self.disableCoverage flag.
"""
def __init__(self):
"""Default constructor.
"""
self.log = log
"""The logger instance that should be used to log from this class. """
self.project = Project.getInstance()
"""The L{pysys.xml.project.Project} instance containing settings for this PySys project."""
assert self.project or 'doctest' in sys.argv[0], 'Project was not loaded yet' # allow it only during doctest-ing
self.processList = []
self.processCount = {}
self.__cleanupFunctions = []
self.outcome = [] # internal, do NOT use directly
self.__outcomeReason = ''
self.defaultAbortOnError = self.project.defaultAbortOnError.lower()=='true' if hasattr(self.project, 'defaultAbortOnError') else DEFAULT_ABORT_ON_ERROR
self.defaultIgnoreExitStatus = self.project.defaultIgnoreExitStatus.lower()=='true' if hasattr(self.project, 'defaultIgnoreExitStatus') else True
self.__uniqueProcessKeys = {}
self.__pythonCoverageFile = 0
self.disableCoverage = False
self.lock = threading.RLock()
"""
A recursive lock that can be used for protecting the fields of this instance
from access by background threads, as needed.
"""
def __getattr__(self, name):
"""Set self.input or self.output to the current working directory if not defined.
"""
if name == "input" or name == "output":
return os.getcwd()
else:
raise AttributeError("Unknown class attribute ", name)
[docs] def allocateUniqueStdOutErr(self, processKey):
"""Allocate unique filenames of the form ``processKey[.n].out/.err``
which can be used for the `startProcess` ``stdouterr`` parameter.
The first time this is called it will return names like
``('myprocess.out', 'myprocess.err')``, the second time it will return
``('myprocess.1.out', 'myprocess.1.err')``, then
``('myprocess.2.out', 'myprocess.2.err')`` etc.
:param str processKey: A user-defined identifier that will form the prefix onto which ``[.n].out`` is appended
:return: A STDOUTERR_TUPLE named tuple of (stdout, stderr)
:rtype: STDOUTERR_TUPLE
"""
newval = self.__uniqueProcessKeys.get(processKey, -1)+1
self.__uniqueProcessKeys[processKey] = newval
suffix = '.%d'%(newval) if newval > 0 else ''
return STDOUTERR_TUPLE(
os.path.join(self.output, processKey+suffix+'.out'),
os.path.join(self.output, processKey+suffix+'.err'),
)
[docs] def getInstanceCount(self, displayName):
"""(Deprecated) Return the number of processes started within the testcase matching the supplied displayName.
:deprecated: The recommended way to allocate unique names is now L{allocateUniqueStdOutErr}
The ProcessUser class maintains a reference count of processes started within the class instance
via the L{startProcess()} method. The reference count is maintained against a logical name for
the process, which is the C{displayName} used in the method call to L{startProcess()}, or the
basename of the command if no displayName was supplied. The method returns the number of
processes started with the supplied logical name, or 0 if no processes have been started.
:param displayName: The process display name
:return: The number of processes started matching the command basename
:rtype: int
"""
if displayName in self.processCount:
return self.processCount[displayName]
else:
return 0
def setKeywordArgs(self, xargs):
"""Set the xargs as data attributes of the class. For internal use by BaseTest/BaseRunner only.
:meta private:
Values in the xargs dictionary are set as data attributes using the builtin C{setattr} method.
Thus an xargs dictionary of the form C{{'foo': 'bar'}} will result in a data attribute of the
form C{self.foo} with C{value bar}. This is used so that subclasses can define default values of
data attributes, which can be overriden on instantiation e.g. using the -X options to the
runTest.py launch executable.
If an existing attribute is present on this test class (typically a
static class variable) and it has a type of bool, int or float, then
any -X options will be automatically converted from string to that type.
This facilitates providing default values for parameters such as
iteration count or timeouts as static class variables with the
possibility of overriding on the command line, for example `-Xiterations=123`.
:param xargs: A dictionary of the user defined extra arguments
"""
for key in list(xargs.keys()):
val = xargs[key]
basetestDefaultValue = getattr(self, key, None) # most of the time these will not be on the basetest
if basetestDefaultValue is not None and isstring(val):
# attempt type coersion to keep the type the same
if basetestDefaultValue is True or basetestDefaultValue is False:
val = val.lower()=='true'
elif isinstance(basetestDefaultValue, int):
val = int(val)
elif isinstance(basetestDefaultValue, float):
val = float(val)
setattr(self, key, val)
[docs] def getBoolProperty(self, propertyName, default=False):
"""
Get a True/False indicating whether the specified property is set
on this object (typically as a result of specifying -X on the command
line), or else from the project configuration.
:param propertyName: The name of a property set on the command line
or project configuration.
"""
val = getattr(self, propertyName, None)
if val is None: val = getattr(self.project, propertyName, None)
if val is None: return default
if val==True or val==False: return val
return val.lower()=='true'
[docs] def startPython(self, arguments, disableCoverage=False, **kwargs):
"""
Start a Python process with the specified arguments.
Uses the same Python process the tests are running under.
If PySys was run with the arguments ``-X pythonCoverage=true`` then
`startPython` will add the necessary arguments to enable generation of
code coverage. Note that this requried the coverage.py library to be
installed. If a project property called `pythonCoverageArgs` exists
then its value will be added as (space-delimited) arguments to the
coverage tool.
:param arguments: The arguments to pass to the Python executable.
Typically the first one be either the name of a Python script
to execute, or ``-m`` followed by a module name.
:param kwargs: See L{startProcess} for detail on available arguments.
:param disableCoverage: Disables code coverage for this specific
process. Coverage can also be disabled by setting
``self.disableCoverage==True`` on this test instance.
:return: The process handle of the process.
:rtype: pysys.process.commonwrapper.CommonProcessWrapper
"""
args = arguments
if 'environs' in kwargs:
environs = kwargs['environs']
else:
environs = kwargs.setdefault('environs', self.getDefaultEnvirons(command=sys.executable))
if self.getBoolProperty('pythonCoverage') and not disableCoverage and not self.disableCoverage:
if hasattr(self.project, 'pythonCoverageArgs'):
args = [a for a in self.project.pythonCoverageArgs.split(' ') if a]+args
args = ['-m', 'coverage', 'run']+args
if 'COVERAGE_FILE' not in environs:
kwargs['environs'] = dict(environs)
with self.lock:
self.__pythonCoverageFile += 1
kwargs['environs']['COVERAGE_FILE'] = self.output+'/.coverage.python.%02d'%(self.__pythonCoverageFile)
return self.startProcess(sys.executable, arguments=args, **kwargs)
[docs] def startProcess(self, command, arguments, environs=None, workingDir=None, state=None,
timeout=TIMEOUTS['WaitForProcess'], stdout=None, stderr=None, displayName=None,
abortOnError=None, expectedExitStatus='==0', ignoreExitStatus=None, quiet=False, stdouterr=None,
background=False):
"""Start a process running in the foreground or background, and return
the `pysys.process.commonwrapper.CommonProcessWrapper` object.
Typical use is::
myexecutable = self.startProcess('path_to_my_executable',
arguments=['myoperation', 'arg1','arg2'],
environs=self.createEnvirons(addToLibPath=['my_ld_lib_path']), # if a customized environment is needed
stdouterr=self.allocateUniqueStdOutErr('myoperation'), # for stdout/err files, pick a suitable logical name for what it's doing
background=True # or remove for default behaviour of executing in foreground
)
The method allows spawning of new processes in a platform independent way. The command, arguments,
environment and working directory to run the process in can all be specified in the arguments to the
method, along with the filenames used for capturing the stdout and stderr of the process. Processes may
be started in the foreground, in which case the method does not return until the process has completed
or a time out occurs, or in the background in which case the method returns immediately to the caller
returning a handle to the process to allow manipulation at a later stage, typically with L{waitProcess}.
All processes started in the background and not explicitly killed using the returned process
object are automatically killed on completion of the test via the L{cleanup()} destructor.
When starting a process that will listen on a server socket, use `getNextAvailableTCPPort`
to allocate a free port before calling this method.
:param str command: The path to the executable to be launched (should include the full path)
:param list[str] arguments: A list of arguments to pass to the command
:param dict(str,str) environs: A dictionary specifying the environment to run the process in.
If a None or empty dictionary is passed, L{getDefaultEnvirons} will be invoked to
produce a suitable clean default environment for this `command`, containing a minimal set of variables.
If you wish to specify a customized environment, L{createEnvirons()} is a great way to create it.
:param str workingDir: The working directory for the process to run in (defaults to the testcase output subdirectory)
:param bool background: Set to True to start the process in the background. By default processes are started
in the foreground, meaning execution of the test will continue only once the process has terminated.
:param state: Alternative way to set ``background=True``.
Run the process either in the C{FOREGROUND} or C{BACKGROUND} (defaults to C{FOREGROUND}). Setting
state=BACKGROUND is equivalent to setting background=True; in new tests using background=True is the
preferred way to do this.
:param int timeout: The number of seconds after which to terminate processes running in the foreground. For processes
that complete in a few seconds or less, it is best to avoid overriding this and stick with the default.
However for long-running foreground processes it will be necessary to set a larger number, for example
if running a soak test where the process needs to run for up to 2 hours you could set ``timeout=2*60*60``.
:param str stdouterr: The filename prefix to use for the stdout and stderr of the process
(`.out`/`.err` will be appended), or a tuple of (stdout,stderr) as returned from
L{allocateUniqueStdOutErr}.
The stdouterr prefix is also used to form a default display name for
the process if none is explicitly provided.
The files are created relative to the test output directory.
The filenames can be accessed from the returned process object using
L{pysys.process.commonwrapper.CommonProcessWrapper.stdout} and
L{pysys.process.commonwrapper.CommonProcessWrapper.stderr}.
:param str stdout: The filename used to capture the stdout of the process. It is usually simpler to use `stdouterr` instead of this.
:param str stderr: The filename used to capture the stderr of the process. It is usually simpler to use `stdouterr` instead of this.
:param str displayName: Logical name of the process used for display in log messages, and the str(...)
representation of the returned process object
(defaults to a string generated from the stdouterr and/or the command).
:param bool abortOnError: If true abort the test on any error outcome (defaults to the defaultAbortOnError
project setting)
:param str expectedExitStatus: The condition string used to determine whether the exit status/code
returned by the process is correct. The default is '==0', as an exit code of zero usually indicates success, but if you
are expecting a non-zero exit status (for example because you are testing correct handling of
a failure condition) this could be set to '!=0' or a specific value such as '==5'.
:param bool ignoreExitStatus: If False, a BLOCKED outcome is added if the process terminates with an
exit code that doesn't match expectedExitStatus (or if the command cannot be run at all).
This can be set to True in cases where you do not care whether the command succeeds or fails, or wish to handle the
exit status separately with more complicated logic.
The default value of ignoreExitStatus=None means the value will
be taken from the project property defaultIgnoreExitStatus, which can be configured in the project XML
(the recommended default property value is defaultIgnoreExitStatus=False), or is set to True for
compatibility with older PySys releases if no project property is set.
:param bool quiet: If True, this method will not do any INFO or WARN level logging
(only DEBUG level), unless a failure outcome is appended. This parameter can be
useful to avoid filling up the log where it is necessary to repeatedly execute a
command check for completion of some operation until it succeeds; in such cases
you should usually set ignoreExitStatus=True as well since both success and
failure exit statuses are valid.
:return: The process wrapper object.
:rtype: pysys.process.commonwrapper.CommonProcessWrapper
"""
if state is None: state = FOREGROUND
if background: state = BACKGROUND
if ignoreExitStatus == None: ignoreExitStatus = self.defaultIgnoreExitStatus
workingDir = os.path.join(self.output, workingDir or '')
if abortOnError == None: abortOnError = self.defaultAbortOnError
if stdouterr:
if stdout or stderr: raise Exception('Cannot specify both stdouterr and stdout/stderr')
if isstring(stdouterr):
stdout = stdouterr+'.out'
stderr = stdouterr+'.err'
else:
stdout, stderr = stdouterr
if not displayName:
# Heuristically the name selected by the user for stdout/err usually represents the
# logical purpose of the process so makes a great display name.
# Also add the command (unless they're the same).
# NB: We do not do this if stdout/stderr are used since that could break
# behaviour for old tests using getInstanceCount.
displayName = os.path.basename(stdout.replace('.out',''))
if os.path.basename(command) not in displayName and displayName not in command:
displayName = '%s<%s>'%(os.path.basename(command), displayName)
# in case stdout/err were given as non-absolute paths, make sure they go to the output dir not the cwd
if stdout: stdout = os.path.join(self.output, stdout)
if stderr: stderr = os.path.join(self.output, stderr)
if not displayName: displayName = os.path.basename(command)
if not environs: # a truly empty env isn't really usable, so populate it with a minimal default environment instead
environs = self.getDefaultEnvirons(command=command)
try:
startTime = time.time()
process = ProcessWrapper(command, arguments, environs, workingDir, state, timeout, stdout, stderr, displayName=displayName)
process.start()
if state == FOREGROUND:
correctExitStatus = pysys.internal.safe_eval.safe_eval('%d %s'%(process.exitStatus, expectedExitStatus), extraNamespace={'self':self})
logmethod = log.info if correctExitStatus else log.warn
if quiet: logmethod = log.debug
logmethod("Executed %s, exit status %d%s", displayName, process.exitStatus,
", duration %d secs" % (time.time()-startTime) if (int(time.time()-startTime)) > 0 else "")
if not ignoreExitStatus and not correctExitStatus:
self.addOutcome(BLOCKED,
('%s returned non-zero exit code %d'%(process, process.exitStatus))
if expectedExitStatus=='==0' else
('%s returned exit code %d (expected %s)'%(process, process.exitStatus, expectedExitStatus)),
abortOnError=abortOnError)
elif state == BACKGROUND:
(log.info if not quiet else log.debug)("Started %s with process id %d", displayName, process.pid)
except ProcessError as e:
if not ignoreExitStatus:
self.addOutcome(BLOCKED, 'Could not start %s process: %s'%(displayName, e), abortOnError=abortOnError)
else: # this wouldn't happen during a polling-until-success use case so is always worth logging even in quiet mode
log.info("%s", sys.exc_info()[1], exc_info=0)
except ProcessTimeout:
self.addOutcome(TIMEDOUT, '%s timed out after %d seconds'%(process, timeout), printReason=False, abortOnError=abortOnError)
(log.warn if not quiet else log.debug)("Process %r timed out after %d seconds, stopping process", process, timeout, extra=BaseLogFormatter.tag(LOG_TIMEOUTS))
process.stop()
else:
with self.lock:
self.processList.append(process)
if displayName in self.processCount:
self.processCount[displayName] = self.processCount[displayName] + 1
else:
self.processCount[displayName] = 1
return process
[docs] def getDefaultEnvirons(self, command=None, **kwargs):
"""
Create a new dictionary of environment variables, suitable for passing to
L{startProcess()}, with a minimal clean set of environment variables
for this platform, unaffected (as much as possible) by the
environment that the tests are being run under.
This environment contains a minimal PATH/LD_LIBRARY_PATH but does not
attempt to replicate the full set of default environment variables
on each OS, and in particular it does not include any that identify
the the current username or home area. Additional environment
variables can be added as needed with L{createEnvirons} overrides. If
you don't care about minimizing the risk of your local environment
affecting the test processes you start, just use C{environs=os.environ}
to allow child processes to inherit the entire parent environment.
The L{createEnvirons()} and L{startProcess()} methods use this as the
basis for creating a new set of default environment variables.
If needed this method can be overridden in subclasses to add common
environment variables for every process invoked by startProcess, for
example to enable options such as code coverage for Java/Python/etc.
This is also a good place to customize behaviour for different
operating systems.
Some features of this method can be configured by setting project
properties:
- ``defaultEnvironsDefaultLang``: if set to a value such as ``en_US.UTF-8``
the specified value is set for the LANG= variable on Unix; otherwise,
the LANG variable is not set (which might result in use of the
legacy POSIX/C encoding).
- ``defaultEnvironsTempDir``: if set the expression will be passed to
Python ``eval()`` and used to set the OS-specific temp directory
environment variables. A typical value is `self.output`.
- ``defaultEnvironsLegacyMode``: set to true to enable compatibility
mode which keeps the behaviour the same as PySys v1.1, 1.2 and 1.3,
namely using a completely empty default environment on Unix, and
a copy of the entire parent environment on Windows. This is not
recommended unless you have a lot of legacy tests that cannot
easily be changed to only set minimal required environment
variables using `createEnvirons()`.
:param command: If known, the full path of the executable for which
a default environment is being created (when called from `startProcess`
this is always set). This allows default environment variables to be
customized for different process types e.g. Java, Python, etc.
When using ``command=sys.executable`` to launch another copy of the
current Python executable, extra items from this process's path
environment variables are added to the returned dictionary so that it
can start correctly. On Unix-based systems this includes copying all of
the load library path environment variable from the parent process.
:param kwargs: Overrides of this method should pass any additional
kwargs down to the super implementation, to allow for future extensions.
:return: A new dictionary containing the environment variables.
"""
assert not kwargs, 'Unknown keyword arguments: %s'%kwargs.keys()
# this feature is a workaround to maintain compatibility for a bug in PySys v1.1-1.3
# (see https://github.com/pysys-test/pysys-test/issues/9 for details)
if getattr(self.project, 'defaultEnvironsLegacyMode','').lower()=='true':
if IS_WINDOWS:
return dict(os.environ)
else:
return {}
e = {}
# allows setting TEMP to output dir to avoid contamination/filling up of system location
if getattr(self.project, 'defaultEnvironsTempDir',None)!=None:
tempDir = pysys.internal.safe_eval.safe_eval(self.project.defaultEnvironsTempDir, extraNamespace={'self':self})
self.mkdir(tempDir)
if IS_WINDOWS: # pragma: no cover
e['TEMP'] = e['TMP'] = os.path.normpath(tempDir)
else:
e['TMPDIR'] = os.path.normpath(tempDir)
inherited = []
# env vars where it is safe and useful to inherit parent values
# avoid anything user-specific or that might cause tests to store data
# outside the test outpuot directory
if IS_WINDOWS:
# for windows there are lots; as a matter of policy we set this to a small
# minimal set used by a lot of programs. Keeping up with every single env
# var Microsoft sets in every Windows OS release would be too painful,
# and better to make users explicitly opt-in to the env vars they want
inherited.extend(['ComSpec', 'OS', 'PATHEXT', 'SystemRoot', 'SystemDrive', 'windir',
'NUMBER_OF_PROCESSORS', 'PROCESSOR_ARCHITECTURE',
'COMMONPROGRAMFILES', 'COMMONPROGRAMFILES(X86)', 'PROGRAMFILES', 'PROGRAMFILES(X86)',
'SYSTEM', 'SYSTEM32'])
for k in inherited:
if k in os.environ: e[k] = os.environ[k]
# always set PATH/LD_LIB_PATH to clean values from constants.py
# note that if someone is using an OS with different defaults they won't
# be able to edit constants.py but will be able to provide a custom
# implementation of this method
e['PATH'] = PATH
if LD_LIBRARY_PATH:
e['LD_LIBRARY_PATH'] = LD_LIBRARY_PATH
if DYLD_LIBRARY_PATH:
e['DYLD_LIBRARY_PATH'] = DYLD_LIBRARY_PATH
if not IS_WINDOWS:
if getattr(self.project, 'defaultEnvironsDefaultLang',''):
e['LANG'] = self.project.defaultEnvironsDefaultLang
if command == sys.executable:
# Ensure it's possible to run another instance of this Python, by adding it to the start of the path env vars
# (but only if full path to the Python executable exactly matches).
# Keep it as clean as possible by not passing sys.path/PYTHONPATH
# - but it seems we do need to copy the LD_LIBRARY_PATH from the parent process to ensure the required libraries are present.
# Do not set PYTHONHOME here, as doesn't work well in virtualenv, and messes up grandchildren
# processes that need a different Python version
e['PATH'] = os.path.dirname(sys.executable)+os.pathsep+e['PATH']
if LIBRARY_PATH_ENV_VAR != 'PATH': # if it's an os with something like LD_LIBRARY_PATH
# It's a shame it's necessary to copy parent environment, but there's no sane way to unpick which libraries are
# actually required on Unix. Make sure we don't set this env var to an empty string just in case that
# doesn't anything weird.
newlibpath = (os.getenv(LIBRARY_PATH_ENV_VAR,'')+os.pathsep+e.get(LIBRARY_PATH_ENV_VAR,'')).strip(os.pathsep)
if newlibpath:
e[LIBRARY_PATH_ENV_VAR] = newlibpath
self.log.debug('getDefaultEnvirons was called with a command matching this Python executable; adding required path environment variables from parent environment, including %s=%s', LIBRARY_PATH_ENV_VAR, os.getenv(LIBRARY_PATH_ENV_VAR,''))
else:
self.log.debug('getDefaultEnvirons was called with a command matching this Python executable; adding required path environment variables from parent environment')
return e
[docs] def createEnvirons(self, overrides=None, addToLibPath=[], addToExePath=[], command=None, **kwargs):
"""
Create a new customized dictionary of environment variables suitable
for passing to L{startProcess()}'s ``environs=`` argument.
As a starting point, this method uses the value returned by
L{getDefaultEnvirons()} for this `command`. See the documentation on
that method for more details. If you don't care about minimizing the
risk of your local environment affecting the test processes you start,
just use C{environs=os.environ} to allow child processes to inherit the
entire parent environment instead of using this method.
:param overrides: A dictionary of environment variables whose
values will be used instead of any existing values.
You can use `os.getenv('VARNAME','')` if you need to pass selected
variables from the current process as part of the overrides list.
If the value is set to None then any variable of this name will be
deleted. Use unicode strings if possible (byte strings will be
converted depending on the platform).
A list of dictionaries can be specified, in which case the latest
will override the earlier if there are any conflicts.
:param addToLibPath: A path or list of paths to be prepended to the
default value for the environment variable used to load libraries
(or the value specified in overrides, if any),
i.e. ``[DY]LD_LIBRARY_PATH`` on Unix or ``PATH`` on Windows. This is usually
more convenient than adding it directly to `overrides`.
:param addToExePath: A path or list of paths to be prepended to the
default value for the environment variable used to locate executables
(or the value specified in overrides, if any),
i.e. ``PATH`` on both Unix and Windows. This is usually
more convenient than adding it directly to ``overrides``.
:param command: If known, the full path of the executable for which
a default environment is being created (passed to L{getDefaultEnvirons}).
:param kwargs: Overrides of this method should pass any additional
kwargs down to the super implementation, to allow for future extensions.
:return: A new dictionary containing the environment variables.
"""
assert not kwargs, 'Unknown keyword arguments: %s'%kwargs.keys()
e = self.getDefaultEnvirons(command=command)
if overrides:
if not isinstance(overrides, list): overrides = [overrides]
for d in overrides:
if d:
for k in d:
if k.upper() in ['PATH']: k = k.upper() # normalize common ones to avoid chance of duplicates
if d[k] is None:
e.pop(k, None) # remove
else:
e[k] = d[k]
def preparepath(path):
if isstring(path):
if os.pathsep not in path: path = os.path.normpath(path)
else:
path = os.pathsep.join([os.path.normpath(p) for p in path if p])
return path
if addToLibPath:
e[LIBRARY_PATH_ENV_VAR] = preparepath(addToLibPath)+os.pathsep+e[LIBRARY_PATH_ENV_VAR]
if addToExePath:
e['PATH'] = preparepath(addToExePath)+os.pathsep+e['PATH']
return e
[docs] def stopProcess(self, process, abortOnError=None):
"""Stops the specified process, if it is currently running.
Does nothing if the process is not running.
This is equivalent to calling `pysys.process.commonwrapper.CommonProcessWrapper.stop()`, except it also
logs an info message when the process is stopped.
:param process: The process handle returned from the L{startProcess} method
:param abortOnError: If True abort the test on any error outcome (defaults to the defaultAbortOnError
project setting), if False a failure to stop the process will just be logged as a warning.
"""
if abortOnError == None: abortOnError = self.defaultAbortOnError
if process.running():
try:
process.stop()
log.info("Stopped process %r", process)
except ProcessError as e:
if not abortOnError:
log.warn("Ignoring failure to stop process %r due to: %s", process, e)
else:
self.abort(BLOCKED, 'Unable to stop process %r'%(process), self.__callRecord())
[docs] def signalProcess(self, process, signal, abortOnError=None):
"""Send a signal to a running process.
This method uses the `pysys.process.commonwrapper.CommonProcessWrapper.signal` to send a signal to a
running process.
Should the request to send the signal to the running process fail, a C{BLOCKED} outcome will be added to the
outcome list.
:param process: The process handle returned from the L{startProcess} method
:param signal: The integer value of the signal to send
:param abortOnError: If True aborts the test with an exception on any error, if False just log it as a warning.
(defaults to the defaultAbortOnError project setting)
"""
if abortOnError == None: abortOnError = self.defaultAbortOnError
if process.running():
try:
process.signal(signal)
log.info("Sent %d signal to process %r", signal, process)
except ProcessError as e:
if not abortOnError:
log.warn("Ignoring failure to signal process %r due to: %s", process, e)
else:
self.abort(BLOCKED, 'Unable to signal process %r'%(process), self.__callRecord())
[docs] def waitProcess(self, process, timeout, abortOnError=None):
"""Wait for a background process to terminate, completing on termination or expiry of the timeout.
Timeouts will result in an exception unless the project property ``defaultAbortOnError==False``.
This method does not check the exit code for success, but you can manually
check the return value (which is the same as ``process.exitStatus``) using `assertThat` if you wish to check it succeeded.
:param process: The process handle returned from the L{startProcess} method
:param timeout: The timeout value in seconds to wait before returning
:param abortOnError: If True aborts the test with an exception on any error, if False just log it as a warning.
(defaults to the defaultAbortOnError project setting)
:return: The process's ``exitStatus``. This will be None if the process timed out and abortOnError is disabled.
"""
if abortOnError == None: abortOnError = self.defaultAbortOnError
assert timeout > 0, 'timeout must always be specified'
try:
log.info("Waiting up to %d secs for process %r", timeout, process)
t = time.time()
process.wait(timeout)
if time.time()-t > 10:
log.info("Process %s terminated after %d secs", process, time.time()-t)
except ProcessTimeout:
if not abortOnError:
log.warn("Ignoring timeout waiting for process %r after %d secs", process, time.time() - t, extra=BaseLogFormatter.tag(LOG_TIMEOUTS))
else:
self.abort(TIMEDOUT, 'Timed out waiting for process %s after %d secs'%(process, timeout), self.__callRecord())
return process.exitStatus
[docs] def writeProcess(self, process, data, addNewLine=True):
"""Write binary data to the stdin of a process.
This method uses `pysys.process.commonwrapper.CommonProcessWrapper.write` to write binary data to the stdin of a process. This
wrapper around the write method of the process helper only adds checking of the process running status prior
to the write being performed, and logging to the testcase run log to detail the write.
:param process: The process handle returned from the L{startProcess()} method
:param data: The data to write to the process stdin.
As only binary data can be written to a process stdin,
if a character string rather than a byte object is passed as the data,
it will be automatically converted to a bytes object using the encoding
given by locale.getpreferredencoding().
:param addNewLine: True if a new line character is to be added to the end of the data string
"""
if process.running():
process.write(data, addNewLine)
log.info("Written to stdin of process %r", process)
log.debug(" %s" % data)
else:
raise Exception("Write to process %r stdin not performed as process is not running", process)
[docs] def waitForSocket(self, port, host='localhost', timeout=TIMEOUTS['WaitForSocket'], abortOnError=None, process=None,
socketAddressFamily=socket.AF_INET):
"""Wait until it is possible to establish a socket connection to a
server running on the specified local or remote port.
This method blocks until connection to a particular host:port pair can be established. This is useful for
test timing where a component under test creates a socket for client server interaction - calling of this
method ensures that on return of the method call the server process is running and a client is able to
create connections to it. If a connection cannot be made within the specified timeout interval, the method
returns to the caller, or aborts the test if abortOnError=True.
.. versionchanged:: 1.5.1
Added host and socketAddressFamily parameters.
:param port: The port value in the socket host:port pair
:param host: The host value in the socket host:port pair
:param timeout: The timeout in seconds to wait for connection to the socket
:param abortOnError: If true abort the test on any failure (defaults to the defaultAbortOnError
project setting)
:param process: If a handle to a process is specified, the wait will abort if
the process dies before the socket becomes available. It is recommended to set this wherever possible.
:param socketAddressFamily: The socket address family e.g. IPv4 vs IPv6. See Python's ``socket`` module for
details.
"""
if abortOnError == None: abortOnError = self.defaultAbortOnError
log.debug("Performing wait for socket creation %s:%s", host, port)
with process_lock:
s = socket.socket(socketAddressFamily, socket.SOCK_STREAM)
# the following lines are to prevent handles being inherited by
# other processes started while this test is runing
if OSFAMILY =='windows':
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 0)
import win32api, win32con
win32api.SetHandleInformation(s.fileno(), win32con.HANDLE_FLAG_INHERIT, 0)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
import fcntl
fcntl.fcntl(s.fileno(), fcntl.F_SETFD, 1)
try:
startTime = time.time()
while True:
try:
s.connect((host, port))
s.shutdown(socket.SHUT_RDWR)
log.debug("Wait for socket creation completed successfully")
if time.time()-startTime>10:
log.info("Wait for socket creation completed after %d secs", time.time()-startTime)
return True
except socket.error:
if process and not process.running():
msg = "Waiting for socket connection aborted due to unexpected process %s termination"%(process)
if abortOnError:
self.abort(BLOCKED, msg, self.__callRecord())
else:
log.warn(msg)
return False
if timeout:
currentTime = time.time()
if currentTime > startTime + timeout:
msg = "Timed out waiting for creation of socket after %d secs"%(time.time()-startTime)
if abortOnError:
self.abort(TIMEDOUT, msg, self.__callRecord())
else:
log.warn(msg)
return False
time.sleep(0.01)
finally:
s.close()
[docs] def waitForFile(self, file, filedir=None, timeout=TIMEOUTS['WaitForFile'], abortOnError=None):
"""Wait for a file to exist on disk.
This method blocks until a file is created on disk. This is useful for test timing where
a component under test creates a file (e.g. for logging) indicating it has performed all
initialisation actions and is ready for the test execution steps. If a file is not created
on disk within the specified timeout interval, the method returns to the caller.
:param file: The basename of the file used to wait to be created
:param filedir: The dirname of the file (defaults to the testcase output subdirectory)
:param timeout: The timeout in seconds to wait for the file to be created
:param abortOnError: If true abort the test on any failure (defaults to the defaultAbortOnError
project setting)
"""
if abortOnError == None: abortOnError = self.defaultAbortOnError
if filedir is None: filedir = self.output
f = os.path.join(filedir, file)
log.debug("Performing wait for file creation: %s", f)
startTime = time.time()
while True:
if timeout:
currentTime = time.time()
if currentTime > startTime + timeout:
msg = "Timed out waiting for creation of file %s after %d secs" % (file, time.time()-startTime)
if abortOnError:
self.abort(TIMEDOUT, msg, self.__callRecord())
else:
log.warn(msg)
break
time.sleep(0.01)
if pathexists(f):
log.debug("Wait for '%s' file creation completed successfully", file)
return
[docs] def waitForSignal(self, file, filedir=None, expr="", **waitForGrepArgs):
"""Old alias for `waitForGrep`; please use `waitForGrep` in new tests.
All parameters are the same, except that in waitForSignal the (rarely used) ``filedir`` argument can be
specified as the 2nd positional argument (after ``file`` and before ``expr``) whereas in waitForGrep it can
only be specified as a ``filedir=`` keyword argument.
"""
return self.waitForGrep(file, expr=expr, filedir=filedir, **waitForGrepArgs)
[docs] def waitForGrep(self, file, expr="", condition=">=1", timeout=TIMEOUTS['WaitForSignal'], poll=0.25,
ignores=[], process=None, errorExpr=[], abortOnError=None, encoding=None, detailMessage='', filedir=None,
reFlags=0):
"""Wait for a regular expression line to be seen (one or more times) in a text file in the output
directory (waitForGrep was formerly known as `waitForSignal`).
This method provides some parameters that give helpful fail-fast behaviour with a descriptive outcome reason;
use these whenever possible:
- ``process=`` to abort if success becomes impossible due to premature termination of the process that's
generating the output
- ``errorExpr=`` to abort if an error message/expression is written to the file
This will generate much clearer outcome reasons, which makes test failures easy to triage,
and also avoids wasting time waiting for something that will never happen.
Example::
self.waitForGrep('myprocess.log', expr='INFO .*Started successfully', process=myprocess,
errorExpr=[' ERROR ', ' FATAL ', 'Failed to start'], encoding='utf-8')
Note that waitForGrep fails the test if the expression is not found (unless abortOnError was set to False,
which isn't recommended), so there is no need to add duplication with an
`assertGrep <pysys.basetest.BaseTest.assertGrep>` to check for the same expression in your validation logic.
The message(s) logged when there is a successful wait can be controlled with the project
property ``verboseWaitForGrep=true/false`` (or equivalently, ``verboseWaitForSignal``); for best visibility
into what is happening set this property to true in your ``pysysproject.xml``.
You can extract information from the matched expression, optionally perform assertions on it, by
using one or more ``(?P<groupName>...)`` named groups in the expression. A common
pattern is to unpack the resulting dict using ``**kwargs`` syntax and pass to `BaseTest.assertThat`. For
example::
self.assertThat('username == expected', expected='myuser',
**self.waitForGrep('myserver.log', expr=r'Successfully authenticated user "(?P<username>[^"]*)"'))
.. versionadded:: 1.5.1
:param str file: The path of the file to be searched. Usually this is a name/path relative to the
``self.output`` directory, but alternatively an absolute path can be specified.
:param str expr: The regular expression to search for in the text file.
:param str condition: The condition to be met for the number of lines matching the regular expression; by default
we wait until there is at least one occurrence.
:param int timeout: The number of seconds to wait for the regular expression before giving up and aborting
the test with `pysys.constants.TIMEDOUT` (unless abortOnError=False in which case execution will continue).
:param pysys.process.commonwrapper.CommonProcessWrapper process: The process that is generating the specified
file, to allow the wait to fail fast (instead of timing out) if the process dies before the expected signal
appears. Can be None if the process is not known or is expected to terminate itself during this period.
:param list[str] errorExpr: Optional list of regular expressions, which if found in the file will cause waiting
for the main expression to be aborted with a `pysys.constants.BLOCKED` outcome. This is useful to avoid waiting
a long time for the expected expression when an ERROR is logged that means it will never happen, and
also provides much clearer test failure messages in this case.
:param list[str] ignores: A list of regular expressions used to identify lines in the files which should be ignored
when matching both `expr` and `errorExpr`.
:param float poll: The time in seconds between to poll the file looking for the regular expression and to check against the condition
:param bool abortOnError: If True abort the test on any error outcome (defaults to the defaultAbortOnError
project setting, which for a modern project will be True).
:param str encoding: The encoding to use to open the file and convert from bytes to characters.
The default value is None which indicates that the decision will be delegated
to the L{getDefaultFileEncoding()} method.
:param str detailMessage: An extra string to add to the message logged when waiting to provide extra
information about the wait condition. e.g. ``detailMessage='(downstream message received)'``.
Added in v1.5.1.
:param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching,
combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``.
For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot
be used because expressions are matched against one line at a time. Added in PySys 1.5.1.
:param str filedir: Can be used to provide a directory name to add to the beginning of the ``file`` parameter;
however usually it is clearer just to specify that directory in the ``file``.
:return list[re.Match]: Usually this returns a list of ``re.Match`` objects found for the ``expr``, or an
empty list if there was no match.
If the expr contains any ``(?P<groupName>...)`` named groups, and assuming the condition still the
default of ">=1" (i.e. not trying to find multiple matches), then a dict is returned
containing ``dict(groupName: str, matchValue: str or None)`` (or an empty ``{}`` dict if there is no match)
which allows the result to be passed to `assertThat` for further checking of the matched groups (typically
unpacked using the ``**`` operator; see example above).
"""
assert expr, 'expr= argument must be specified'
assert '\n' not in expr, 'expr= cannot contain multiple lines'
if abortOnError == None: abortOnError = self.defaultAbortOnError
if filedir is None: filedir = self.output
f = os.path.join(filedir, file)
if errorExpr: assert not isstring(errorExpr), 'errorExpr must be a list of strings not a string'
matches = []
startTime = time.time()
msg = "Waiting for {expr} {condition}in {file}{detail}".format(
expr=repr(expr), # repr performs escaping of embedded quotes, newlines, etc
condition=condition.strip()+' ' if condition!='>=1' else '', # only include if non-default
file=os.path.basename(file),
detail=' '+detailMessage.strip(' ') if detailMessage else ''
)
log.debug("Performing wait for grep signal '%s' %s in file %s with ignores %s", expr, condition, f, ignores)
verboseWaitForSignal = self.getBoolProperty('verboseWaitForSignal', False) or self.getBoolProperty('verboseWaitForGrep', False)
if verboseWaitForSignal:
# if verbose, log when starting (which is very helpful for debugging hangs); non-verbose users get the message only when it's done
log.info('%s%s', msg, '; timeout=%ss'%timeout if timeout!=TIMEOUTS['WaitForSignal'] else '')
encoding = encoding or self.getDefaultFileEncoding(f)
# If condition was customized (typically to be more than 1) named groups mode isn't so useful since there would
# be multiple matches, so restrict it to 1 which is the common case anyway.
compiled = re.compile(expr, flags=reFlags)
namedGroupsMode = compiled.groupindex and condition.replace(' ','')=='>=1'
timetaken = time.time()
while 1:
if pathexists(f):
matches = getmatches(f, expr, encoding=encoding, ignores=ignores, flags=reFlags)
if pysys.internal.safe_eval.safe_eval("%d %s" % (len(matches), condition), extraNamespace={'self':self}):
timetaken = time.time()-timetaken
# Old-style/non-verbose behaviour is to log only after complete,
# new/verbose style does the main logging at INFO when starting, and only logs on completion if it took a long time
# (this helps people debug tests that sometimes timeout and sometimes "nearly" timeout)
if verboseWaitForSignal:
(log.info if timetaken > 30 else log.debug)(" ... found %d matches in %ss", len(matches), int(timetaken))
else:
# We use the phrase "grep signal" to avoid misleading anyone, whether people used waitForGrep or the older waitForSignal
log.info("Wait for grep signal in %s completed successfully", file)
break
if errorExpr:
for err in errorExpr:
errmatches = getmatches(f, err+'.*', encoding=encoding, ignores=ignores, flags=reFlags) # add .* to capture entire err msg for a better outcome reason
if errmatches:
err = errmatches[0].group(0).strip()
msg = '%s found while %s'%(quotestring(err), msg)
# always report outcome for this case; additionally abort if requested to
self.addOutcome(BLOCKED, outcomeReason=msg, abortOnError=abortOnError, callRecord=self.__callRecord())
return {} if namedGroupsMode else matches
currentTime = time.time()
if currentTime > startTime + timeout:
msg = "%s timed out after %d secs, %s"%(msg, timeout,
("with %d matches"%len(matches)) if pathexists(f) else 'file does not exist')
if abortOnError:
self.abort(TIMEDOUT, msg, self.__callRecord())
else:
log.warn(msg, extra=BaseLogFormatter.tag(LOG_TIMEOUTS))
break
if process and not process.running():
msg = "%s aborted due to process %s termination"%(msg, process)
if abortOnError:
self.abort(BLOCKED, msg, self.__callRecord())
else:
log.warn(msg)
break
time.sleep(poll)
if namedGroupsMode:
return {} if not matches else matches[0].groupdict()
return matches
[docs] def addCleanupFunction(self, fn):
""" Registers a function that will be called as part of the `cleanup` of this object.
Cleanup functions should have no arguments, and are invoked in reverse order with the most recently added first (LIFO), and
before the automatic termination of any remaining processes associated with this object.
e.g. self.addCleanupFunction(lambda: self.cleanlyShutdownProcessX(params))
"""
with self.lock:
if fn and fn not in self.__cleanupFunctions:
self.__cleanupFunctions.append(fn)
[docs] def cleanup(self):
""" Tear down function that frees resources managed by this object.
Should be called exactly once by the owner of this object when is no longer needed.
Do not override this method, instead use `addCleanupFunction`.
"""
try:
# although we don't yet state this method is thread-safe, make it
# as thread-safe as possible by using swap operations
with self.lock:
cleanupfunctions, self.__cleanupFunctions = self.__cleanupFunctions, []
if cleanupfunctions:
log.info('')
log.info('cleanup:')
for fn in reversed(cleanupfunctions):
try:
log.debug('Running registered cleanup function: %r'%fn)
fn()
except Exception as e:
log.exception('Error while running cleanup function: ')
finally:
with self.lock:
processes, self.processList = self.processList, []
for process in processes:
try:
if process.running(): process.stop()
except Exception:
log.info("caught %s: %s", sys.exc_info()[0], sys.exc_info()[1], exc_info=1)
self.processCount = {}
log.debug('ProcessUser cleanup function done.')
[docs] def addOutcome(self, outcome, outcomeReason='', printReason=True, abortOnError=False, callRecord=None, override=False):
"""Add a validation outcome (and optionally a reason string) to the validation list.
The method provides the ability to add a validation outcome to the internal data structure
storing the list of validation outcomes. Multiple validations may be performed, the current
supported validation outcomes of which are described in :ref:`assertions-and-outcomes`.
The outcomes are considered to have a precedence order, as defined by the order of the outcomes listed
above. Thus a C{pysys.constants.BLOCKED} outcome has a higher precedence than a C{pysys.constants.PASSED}
outcome. The outcomes are defined in L{pysys.constants}.
This method is thread-safe.
Although this method exists on all subclasses of `pysys.process.user.ProcessUser`, in practice only
`pysys.basetest.BaseTest` subclasses actually do anything with the resulting outcome.
:param outcome: The outcome to add, e.g. `pysys.constants.FAILED`.
:param outcomeReason: A string summarizing the reason for the outcome,
for example "Grep on x.log contains 'ERROR: server failed'".
:param printReason: If True the specified outcomeReason will be printed
:param abortOnError: If true abort the test on any error outcome. This should usually be set to
False for assertions, or the configured `self.defaultAbortOnError` setting (typically True) for
operations that involve waiting.
:param callRecord: An array of strings indicating the call stack that lead to this outcome. This will be appended
to the log output for better test triage.
:param override: Remove any existing test outcomes when adding this one, ensuring
that this outcome is the one and only one reported even if an existing outcome
has higher precedence.
"""
assert outcome in PRECEDENT, outcome # ensure outcome type is known, and that numeric not string constant was specified!
with self.lock:
if abortOnError == None: abortOnError = self.defaultAbortOnError
if outcomeReason is None:
outcomeReason = ''
else:
if PY2 and isinstance(outcomeReason, str):
# The python2 logger is very unhappy about byte str objects containing
# non-ascii characters (specifically it will fail to log them and dump a
# traceback on stderr). Since it's pretty important that assertion
# messages and test outcome reasons don't get swallowed, add a
# workaround for this here. Not a problem in python 3.
outcomeReason = outcomeReason.decode('ascii', errors='replace')
outcomeReason = outcomeReason.strip().replace(u'\t', u' ').replace('\r','').replace('\n', ' ; ')
if override:
log.debug('addOutcome is removing existing outcome(s): %s with reason "%s"', [LOOKUP[o] for o in self.outcome], self.__outcomeReason)
del self.outcome[:]
self.__outcomeReason = None
old = self.getOutcome()
if (old == NOTVERIFIED and not self.__outcomeReason): old = None
self.outcome.append(outcome)
#store the reason of the highest precedent outcome
# although we should print whatever is passed in, store a version with control characters stripped
# out so that it's easier to read (e.g. coloring codes from third party tools)
if self.getOutcome() != old: self.__outcomeReason = re.sub(u'[\x00-\x08\x0b\x0c\x0e-\x1F]', '', outcomeReason)
if outcome in FAILS and abortOnError:
if callRecord==None: callRecord = self.__callRecord()
self.abort(outcome, outcomeReason, callRecord)
if outcomeReason and printReason:
if outcome in FAILS:
if callRecord==None: callRecord = self.__callRecord()
log.warn(u'%s ... %s %s', outcomeReason, LOOKUP[outcome].lower(), u'[%s]'%','.join(callRecord) if callRecord!=None else u'',
extra=BaseLogFormatter.tag(LOOKUP[outcome].lower(),1))
else:
log.info(u'%s ... %s', outcomeReason, LOOKUP[outcome].lower(), extra=BaseLogFormatter.tag(LOOKUP[outcome].lower(),1))
[docs] def abort(self, outcome, outcomeReason, callRecord=None):
"""Raise an AbortException with the specified outcome and reason.
See also L{skipTest}.
:param outcome: The outcome, which will override any existing outcomes previously recorded.
:param outcomeReason: A string summarizing the reason for the outcome.
"""
raise AbortExecution(outcome, outcomeReason, callRecord)
[docs] def skipTest(self, outcomeReason, callRecord=None):
"""Raise an AbortException that will set the test outcome to SKIPPED and
ensure that the rest of the execute() and validate() methods do not execute.
This is useful when a test should not be executed in the current mode or platform.
:param outcomeReason: A string summarizing the reason the test is being skipped, for example
"Feature X is not supported on Windows".
"""
raise AbortExecution(SKIPPED, outcomeReason, callRecord)
[docs] def getOutcome(self):
"""Get the overall outcome based on the precedence order.
The method returns the overall outcome of the test based on the outcomes stored in the internal data
structure. The `pysys.constants.PRECEDENT` order of the possible outcomes is used to determined the overall outcome
of the test, e.g. if `pysys.constants.PASSED`, `pysys.constants.BLOCKED` and `pysys.constants.FAILED` were
recorded during the execution of the test, the overall outcome would be `pysys.constants.BLOCKED`.
The method returns the integer value of the outcome as defined in `pysys.constants`. To convert this
to a string representation use the `pysys.constants.LOOKUP` dictionary i.e. ``LOOKUP[test.getOutcome()]``.
:return: The overall outcome
"""
with self.lock:
if len(self.outcome) == 0: return NOTVERIFIED
return sorted(self.outcome, key=lambda x: PRECEDENT.index(x))[0]
[docs] def getOutcomeReason(self):
"""Get the reason string for the current overall outcome (if specified).
:return: The overall test outcome reason or '' if not specified
:rtype: string
"""
with self.lock:
fails = len([o for o in self.outcome if o in FAILS])
if self.__outcomeReason and (fails > 1): return u'%s (+%d other failures)'%(self.__outcomeReason, fails-1)
return self.__outcomeReason
[docs] def getNextAvailableTCPPort(self, hosts=['', 'localhost'], socketAddressFamily=socket.AF_INET):
"""Allocate a free TCP port which can be used for starting a server on this machine.
The port is taken from the pool of available server (non-ephemeral) ports on this machine, and will not
be available for use by any other code in the current PySys process until this object's `cleanup` method is
called to return it to the pool of available ports.
To allocate an IPv4 port for use only on this host::
port = self.getNextAvailableTCPPort(hosts=['localhost'])
.. versionchanged:: 1.5.1
Added hosts and socketAddressFamily parameters.
:param list(Str) hosts: A list of the host names or IP addresses to check when establishing that a potential
allocated port isn't already in use by a process outside the PySys framework.
By default we check ``""`` (which corresponds to ``INADDR_ANY`` and depending on the OS means
either one or all non-localhost IPv4 addresses) and also ``localhost``.
Many machines have multiple network cards each with its own host IP address, and typically you'll only be using
one of them in your test, most commonly ``localhost``. If you do know which host/IP you'll actually be using,
just specify that directly to save time, and avoid needlessly opening remote ports on hosts your're not using.
A list of available host addresses can be found from
``socket.getaddrinfo('', None)``.
:param socketAddressFamily: The socket address family e.g. IPv4 vs IPv6. See Python's ``socket`` module for
details.
"""
o = TCPPortOwner()
self.addCleanupFunction(lambda: o.cleanup())
return o.port
def __callRecord(self):
"""Retrieve a call record outside of this module, up to the execute or validate method of the test case.
"""
stack=[]
from pysys.basetest import BaseTest
if isinstance(self, BaseTest):
for record in inspect.stack():
info = inspect.getframeinfo(record[0])
if (self.__skipFrame(info.filename, ProcessUser) ): continue
if (self.__skipFrame(info.filename, BaseTest) ): continue
stack.append( '%s:%s' % (os.path.basename(info.filename).strip(), info.lineno) )
if (os.path.splitext(info.filename)[0] == os.path.splitext(os.path.join(self.descriptor.testDir, self.descriptor.module))[0] and (info.function == 'execute' or info.function == 'validate')): return stack
return None
def __skipFrame(self, file, clazz):
"""Private method to check if a file is that for a particular class.
:param file: The filepatch to check
:param clazz: The class to check against
"""
return os.path.splitext(file)[0] == os.path.splitext(sys.modules[clazz.__module__].__file__)[0]
[docs] def getExprFromFile(self, path, expr, groups=[1], returnAll=False, returnNoneIfMissing=False, encoding=None, reFlags=0):
""" Searches for a regular expression in the specified file, and returns it.
If the regex contains unnamed groups, the specified group is returned. If the expression is not found, an exception is raised,
unless returnAll=True or returnNoneIfMissing=True. For example::
self.getExprFromFile('test.txt', 'myKey="(.*)"') # on a file containing 'myKey="foobar"' would return "foobar"
self.getExprFromFile('test.txt', 'foo') # on a file containing 'myKey=foobar' would return "foo"
See also `pysys.basetest.BaseTest.assertGrep` which should be used when instead of just finding out what's
in the file you want to assert that a specific expression is matched. assertGrep also provides some additional
functionality such as returning named groups which this method does not currently support.
:param str path: file to search (located in the output dir unless an absolute path is specified)
:param str expr: the regular expression, optionally containing the regex group operator ``(...)``
:param List[int] groups: which regex group numbers (as indicated by brackets in the regex) should be returned;
default is ``[1]`` meaning the first group.
If more than one group is specified, the result will be a tuple of group values, otherwise the
result will be the value of the group at the specified index as a str.
:param bool returnAll: returns a list containing all matching lines if True, the first matching line otherwise.
:param bool returnNoneIfMissing: set this to return None instead of throwing an exception
if the regex is not found in the file
:param str encoding: The encoding to use to open the file.
The default value is None which indicates that the decision will be delegated
to the L{getDefaultFileEncoding()} method.
:param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching,
combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``.
For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot
be used because expressions are matched against one line at a time. Added in PySys 1.5.1.
:return: A List[List[str]] if returnAll=True and groups contains multiple groups, a List[str] if only one of
those conditions is true, or else a simple str containing just the first match found.
"""
with openfile(os.path.join(self.output, path), 'r', encoding=encoding or self.getDefaultFileEncoding(os.path.join(self.output, path))) as f:
matches = []
for l in f:
match = re.search(expr, l, flags=reFlags)
if not match: continue
if match.groups():
if returnAll:
matches.append(match.group(*groups))
else:
return match.group(*groups)
else:
if returnAll:
matches.append(match.group(0))
else:
return match.group(0)
if returnAll: return matches
if returnNoneIfMissing: return None
raise Exception('Could not find expression %s in %s'%(quotestring(expr), os.path.basename(path)))
[docs] def logFileContents(self, path, includes=None, excludes=None, maxLines=20, tail=False, encoding=None, logFunction=None, reFlags=0):
""" Logs some or all of the lines in the specified file.
If the file does not exist or cannot be opened, does nothing. The method is useful for providing key
diagnostic information (e.g. error messages from tools executed by the test) directly in run.log, or
to make test failures easier to triage quickly.
.. versionchanged:: 1.5.1
Added logFunction parameter.
:param str path: May be an absolute, or relative to the test output directory
:param list[str] includes: Optional list of regex strings. If specified, only matches of these regexes will be logged
:param list[str] excludes: Optional list of regex strings. If specified, no line containing these will be logged
:param int maxLines: Upper limit on the number of lines from the file that will be logged. Set to zero for unlimited
:param bool tail: Prints the _last_ 'maxLines' in the file rather than the first 'maxLines'
:param str encoding: The encoding to use to open the file.
The default value is None which indicates that the decision will be delegated
to the L{getDefaultFileEncoding()} method.
:param Callable[[line],None] logFunction: The function that will be used to log individual lines from the file.
Usually this is ``self.log.info(u' %s', line, extra=BaseLogFormatter.tag(LOG_FILE_CONTENTS))``
but a custom implementation can be provided, for example to provide a different color using
`pysys.utils.logutils.BaseLogFormatter.tag`.
:param int reFlags: Zero or more flags controlling how the behaviour of regular expression matching,
combined together using the ``|`` operator, for example ``reFlags=re.VERBOSE | re.IGNORECASE``.
For details see the ``re`` module in the Python standard library. Note that ``re.MULTILINE`` cannot
be used because expressions are matched against one line at a time. Added in PySys 1.5.1.
:return: True if anything was logged, False if not.
"""
if not path: return False
actualpath= os.path.join(self.output, path)
try:
# always open with a specific encoding not in bytes mode, since otherwise we can't reliably pass the read lines to the logger
f = openfile(actualpath, 'r', encoding=encoding or self.getDefaultFileEncoding(actualpath) or locale.getpreferredencoding(), errors='replace')
except Exception as e:
self.log.debug('logFileContents cannot open file "%s": %s', actualpath, e)
return False
try:
lineno = 0
def matchesany(s, regexes):
assert not isstring(regexes), 'must be a list of strings not a string'
for x in regexes:
m = re.search(x, s, flags=reFlags)
if m: return m.group(0)
return None
tolog = []
for l in f:
l = l.rstrip()
if not l: continue
if includes:
l = matchesany(l, includes)
if not l: continue
if excludes and matchesany(l, excludes): continue
lineno +=1
tolog.append(l)
if maxLines:
if not tail and len(tolog) == maxLines:
tolog.append('...')
break
if tail and len(tolog)==maxLines+1:
del tolog[0]
finally:
f.close()
if not tolog:
return False
logextra = BaseLogFormatter.tag(LOG_FILE_CONTENTS)
if logFunction is None:
def logFunction(line):
self.log.info(u' %s', l, extra=logextra)
self.log.info(u'Contents of %s%s: ', os.path.normpath(path), ' (filtered)' if includes or excludes else '', extra=logextra)
for l in tolog:
logFunction(l)
self.log.info(' -----', extra=logextra)
self.log.info('', extra=logextra)
return True
[docs] def mkdir(self, path):
"""
Create a directory, with recursive creation of any parent directories.
This function does nothing (does not raise an except) if the directory already exists.
:param path: The path to be created. This can be an absolute path or
relative to the testcase output directory.
:return: the absolute path of the new directory, to facilitate fluent-style method calling.
"""
path = os.path.join(self.output, path)
mkdir(path)
return path
def deletedir(self, path, **kwargs): return self.deleteDir(path, **kwargs)
[docs] def deleteDir(self, path, **kwargs):
"""
Recursively delete the specified directory.
Does nothing if it does not exist. Raises an exception if the deletion fails.
:param path: The path to be deleted. This can be an absolute path or
relative to the testcase output directory.
:param kwargs: Any additional arguments are passed to
L{pysys.utils.fileutils.deletedir()}.
"""
deletedir(os.path.join(self.output, path), **kwargs)
[docs] def getDefaultFileEncoding(self, file, **xargs):
"""
Specifies what encoding should be used to read or write the specified
text file.
This method is used to select the appropriate encoding whenever PySys
needs to open a file, for example to wait for a signal, for a
file-based assertion, or to write a file with replacements.
Many methods allow the encoding to be overridden for just that call,
but getDefaultFileEncoding exists to allow global defaults to be specified
based on the filename.
For example, this method could be overridden to specify that utf-8 encoding
is to be used for opening filenames ending in .xml, .json and .yaml.
The default implementation of this method uses pysysproject.xml
configuration rules such as::
<default-file-encoding pattern="*.xml" encoding="utf-8"/>
A return value of None indicates default behaviour, which on Python 3 is to
use the default encoding, as specified by python's
``locale.getpreferredencoding()``, and on Python 2 is to use binary ``str``
objects with no character encoding or decoding applied.
:param file: The filename to be read or written. This may be an
absolute path or a relative path.
:param xargs: Ensure that an ``**xargs`` argument is specified so that
additional information can be passed to this method in future releases.
:return: The encoding to use for this file, or None if default behaviour is
to be used.
"""
file = file.replace('\\','/').lower() # normalize slashes and ignore case
for e in self.project.defaultFileEncodings:
# first match wins
if fnmatch.fnmatchcase(file, e['pattern'].lower()) or fnmatch.fnmatchcase(os.path.basename(file), e['pattern'].lower()):
return e['encoding']
return None
[docs] @staticmethod
def compareVersions(v1, v2):
""" Compares two alphanumeric dotted version strings to see which is more recent.
Example usage::
if self.compareVersions(thisversion, '1.2.alpha-3') > 0:
... # thisversion is newer than 1.2.alpha-3
The comparison algorithm ignores case, and normalizes separators ./-/_
so that ``'1.alpha2'=='1Alpha2'``. Any string components are compared
lexicographically with other strings, and compared to numbers
strings are always considered greater.
>>> ProcessUser.compareVersions('10-alpha5.dev10', '10alpha-5-dEv_10') == 0 # normalization of case and separators
True
>>> ProcessUser.compareVersions(b'1....alpha.2', u'1Alpha2') == 0 # ascii byte and unicode strings both supported
True
>>> ProcessUser.compareVersions('1.2.0', '1.2')
0
>>> ProcessUser.compareVersions('1.02', '1.2')
0
>>> ProcessUser().compareVersions('1.2.3', '1.2') > 0
True
>>> ProcessUser.compareVersions('1.2', '1.2.3')
-1
>>> ProcessUser.compareVersions('10.2', '1.2')
1
>>> ProcessUser.compareVersions('1.2.text', '1.2.0') # letters are > numbers
1
>>> ProcessUser.compareVersions('1.2.text', '1.2') # letters are > numbers
1
>>> ProcessUser.compareVersions('10.2alpha1', '10.2alpha')
1
>>> ProcessUser.compareVersions('10.2dev', '10.2alpha') # letters are compared lexicographically
1
>>> ProcessUser.compareVersions('', '')
0
>>> ProcessUser.compareVersions('1', '')
1
:param v1: A string containing a version number, with any number of components.
:param v2: A string containing a version number, with any number of components.
:return: an integer > 0 if v1>v2,
an integer < 0 if v1<v2,
or 0 if they are semantically the same.
"""
return compareVersions(v1, v2)
[docs] def write_text(self, file, text, encoding=None):
"""
Writes the specified characters to a file in the output directory.
:param file: The path of the file to write, either an absolute path or
relative to the `self.output` directory.
:param text: The string to write to the file, with `\\n`
for newlines (do not use `os.linesep` as the file will be opened in
text mode so platform line separators will be added automatically).
On Python 3 this must be a character string.
On Python 2 this can be a character or byte string containing ASCII
characters. If non-ASCII characters are used, it must be a unicode
string if there is an encoding specified for this file/type, or
else a byte string.
:param encoding: The encoding to use to open the file.
The default value is None which indicates that the decision will be delegated
to the L{getDefaultFileEncoding()} method.
"""
# This method provides similar functionality to the Python3 pathlib write_text method.
with openfile(os.path.join(self.output, file), 'w', encoding=encoding or self.getDefaultFileEncoding(file)) as f:
f.write(text)
[docs] def copy(self, src, dest, mappers=[], encoding=None):
"""Copy a single text or binary file, optionally tranforming the
contents by filtering each line through a list of mapping functions.
If any mappers are provided, the file is copied in text mode and
each mapper is given the chance to modify or omit each line.
If no mappers are provided, the file is copied in binary mode.
In addition to the file contents the mode is also copied, for example
the executable permission will be retained.
This function is useful both for creating a modified version of an
output file that's more suitable for later validation steps such as
diff-ing, and also for copying required files from the input to the
output directory.
For example::
self.copy('output-raw.txt', 'output-processed.txt', encoding='utf-8',
mappers=[
lambda line: None if ('Timestamp: ' in line) else line,
lambda line: line.replace('foo', 'bar'),
])
:param src: The source filename, which can be an absolute path, or
a path relative to the ``self.output`` directory.
Use ``src=self.input+'/myfile'`` if you wish to copy a file from the test
input directory.
:param dest: The source filename, which can be an absolute path, or
a path relative to the `self.output` directory. If this is a directory
name, the file is copied to this directory with the same basename as src.
:param mappers: A list of filter functions that will be applied,
in order, to each line read from the file. Each function accepts a string for
the current line as input and returns either a string to write or
None if the line is to be omitted.
:param encoding: The encoding to use to open the file.
The default value is None which indicates that the decision will be delegated
to the L{getDefaultFileEncoding()} method.
:return: the absolute path of the destination file.
"""
src = toLongPathSafe(os.path.join(self.output, src))
dest = toLongPathSafe(os.path.join(self.output, dest))
if os.path.isdir(dest): dest = toLongPathSafe(dest+'/'+os.path.basename(src))
assert src != dest, 'Source and destination file cannot be the same'
if not mappers:
# simple binary copy
shutil.copyfile(src, dest)
else:
with openfile(src, 'r', encoding=encoding or self.getDefaultFileEncoding(src)) as srcf:
with openfile(dest, 'w', encoding=encoding or self.getDefaultFileEncoding(dest)) as destf:
for line in srcf:
for mapper in mappers:
line = mapper(line)
if line is None: break
if line is not None: destf.write(line)
shutil.copymode(src, dest)
return dest