Source code for pysys.process

#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2022 M.B. Grieve

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


"""
Process execution and monitoring implementations. 
"""

from pysys.constants import *

# set the modules to import when imported the pysys.process package
__all__ = [ "helper",
			"monitor", 
			"monitorimpl",
			"user", 
			"Process"]

# add to the __path__ to import the platform specific process.helper module
dirname = __path__[0]
if IS_WINDOWS:
	__path__.append(os.path.join(dirname, "plat-win32"))
else:
	__path__.append(os.path.join(dirname, "plat-unix"))


import os.path, time, threading, sys, locale
import logging
import shlex
import queue as Queue

from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.pycompat import *
from pysys.internal.initlogging import pysysLogHandler

log = logging.getLogger('pysys.process')

[docs]class Process(object): """Represents a process that PySys has started (or can start). A platform-specific implementation subclass of this interface is returned by `pysys.process.user.ProcessUser.startProcess`. :ivar str ~.command: The full path to the executable. :ivar list[str] ~.arguments: A list of arguments to the command. :ivar dict(str,str) ~.environs: A dictionary of environment variables (key, value) for the process context execution. :ivar str ~.workingDir: The working directory for the process :ivar ~.state: The state of the process. :vartype state: `pysys.constants.FOREGROUND` or `pysys.constants.BACKGROUND` :ivar int ~.timeout: The time in seconds for a foreground process to complete. :ivar str ~.stdout: The full path to the filename to write the stdout of the process, or None for no stderr stream. :ivar str ~.stderr: The full path to the filename to write the stderr of the process, or None for no stderr stream. :ivar str ~.displayName: Display name for this process (defaults to the basename if not explicitly specified). The display name is returned by calling ``str()`` on this instance. The display name and pid are returned by ``repr()``. :ivar str expectedExitStatus: The condition string used to determine whether the exit status/code returned by the process is correct, for example '==0'. :ivar int ~.pid: The process id for a running or complete process (as set by the OS), or None if it is not yet started. :ivar int ~.exitStatus: The process exit status for a completed process (for many processes 0 represents success), or None if it has not yet completed. :ivar dict[str,obj] ~.info: A mutable dictionary of user-supplied information that was passed into startProcess, for example port numbers, log file paths etc. """ def __init__(self, command, arguments, environs, workingDir, state, timeout, stdout=None, stderr=None, displayName=None, expectedExitStatus=None, info={}): self.displayName = displayName if displayName else os.path.basename(command) self.info = info self.command = os.path.normpath(command) if any(not isstring(arg) for arg in arguments): arguments = [str(arg) for arg in arguments] self.arguments = arguments self.environs = {} for key in environs: self.environs[key] = environs[key] self.workingDir = os.path.normpath(workingDir) self.state = state self.timeout = timeout self.expectedExitStatus = expectedExitStatus # 'publicly' available data attributes set on execution self.pid = None self.exitStatus = None # these may be further updated by the subclass self.stdout = stdout self.stderr = stderr # catch these common mistakes assert os.path.isdir(self.workingDir), 'Working directory for %s does not exist: %s'%(self.displayName, self.workingDir) if self.stdout: assert os.path.dirname(self.stdout)==self.workingDir or os.path.isdir(os.path.dirname(self.stdout)), 'Parent directory for stdout does not exist: %s'%self.stdout # print process debug information debuginfo = [] if IS_WINDOWS or not hasattr(shlex, 'quote'): quotearg = lambda c: '"%s"'%c if ' ' in c else c else: quotearg = shlex.quote debuginfo.append(" command line : %s"%' '.join(quotearg(c) for c in [self.command]+self.arguments)) for i, a in enumerate(self.arguments): debuginfo.append(" arg #%-2d : %s"%( i+1, a) ) debuginfo.append(" working dir : %s"% self.workingDir) if IS_WINDOWS and len(self.workingDir) > 256-30: debuginfo.append(" NB: length of working dir is %d (Windows MAX_PATH limit is 256 chars)" % len(self.workingDir)) debuginfo.append(" stdout : %s"% stdout) debuginfo.append(" stderr : %s"% stderr) keys=list(self.environs.keys()) keys.sort() for e in keys: value = self.environs[e] debuginfo.append(" environment : %s=%s"%( e, value) ) if 'PATH' in e.upper() and e.upper() not in ['PATHEXT']: # it's worth paths/classpaths/pythonpaths as they're often long and quite hard to spot differences otherwise pathelements = value.split(';' if ';' in value else os.pathsep) if len(pathelements)>1: for i, pathelement in enumerate(pathelements): # : ABC=def debuginfo.append(" #%-2d %s"%( i+1, pathelement)) if info: debuginfo.append(" info : %s"% info) log.debug("Process parameters for %s\n%s", self, '\n'.join(d for d in debuginfo)) # private self._outQueue = None def __str__(self): return self.displayName def __repr__(self): return '%s (pid %s)'%(self.displayName, self.pid) # these abstract methods must be implemented by subclasses; no need to publically document def setExitStatus(self): raise Exception('Not implemented') def startBackgroundProcess(self): raise Exception('Not implemented')
[docs] def stop(self, timeout=TIMEOUTS['WaitForProcessStop'], hard=False): """Stop a running process and wait until it has finished. Does nothing if the process is not running. On Windows, this uses TerminateProcess, on Linux this sends a SIGTERM signal (which allows the process a chance to exit gracefully including possibly dumping code coverage output) unless the ``hard=True`` parameter is specified. :param bool hard: Set to True to use a hard termination (e.g. SIGKILL). :param float timeout: The time to wait for the process to complete before raising an exception. @raise pysys.exceptions.ProcessError: Raised if an error occurred whilst trying to stop the process. """ raise Exception('Not implemented')
[docs] def signal(self, signal): """Send a signal to a running process. Typically this uses ``os.kill`` to send the signal. :param int signal: The integer signal to send to the process, e.g. ``process.signal(signal.SIGTERM)``. @raise pysys.exceptions.ProcessError: Raised if an error occurred whilst trying to signal the process """ try: os.kill(self.pid, signal) except Exception: raise ProcessError("Error sending signal %s to process %r"%(signal, self))
[docs] def write(self, data, addNewLine=True, closeStdinAfterWrite=False): """Write binary data to the stdin of the process. Note that when the addNewLine argument is set to true, if a new line does not terminate the input data string, a newline character will be added. If one already exists a new line character will not be added. Should you explicitly require to add data without the method appending a new line charater set addNewLine to false. :param bytes|str data: The data to write to the process stdin. As only binary data can be written to a process stdin, if a character string rather than a byte object is passed as the data, it will be automatically converted to a bytes object using the encoding given by ``PREFERRED_ENCODING``. :param bool addNewLine: True if a new line character is to be added to the end of the data string :param bool closeStdinAfterWrite: If True, the stdin file handle will be closed after this write. Added in v2.1. """ if not self.running(): raise Exception('Cannot write to process stdin when it is not running') if data is None: return if type(data) != binary_type: data = data.encode(PREFERRED_ENCODING) if addNewLine and not data.endswith(b'\n'): data = data+b'\n' if self._outQueue == None: # start thread on demand self._outQueue = Queue.Queue() __parentLogHandlers = pysysLogHandler.getLogHandlersForCurrentThread() def writeStdinThread(): pysysLogHandler.setLogHandlersForCurrentThread(__parentLogHandlers) try: while self._outQueue: try: data = self._outQueue.get(block=True, timeout=0.25) except Queue.Empty: if not self.running(): # no need to close stdin here, as previous call's setExitCode() method will do it break else: try: self._writeStdin(data) except Exception as ex: (log.debug if not self.running() else log.error)('Failed to write %r to stdin of process %r', data, self, exc_info=True) finally: pysysLogHandler.setLogHandlersForCurrentThread([]) t = threading.Thread(target=writeStdinThread, name='pysys.stdinreader_%s'%str(self), daemon=True) t.start() if data: self._outQueue.put(data) if closeStdinAfterWrite: self._outQueue.put(None) # None is a sentinel value for EOF
[docs] def running(self): """Check to see if a process is running. :return: True if the process is currently running, False if not. """ return self.setExitStatus() is None
[docs] def wait(self, timeout): """Wait for a process to complete execution, raising an exception on timeout. This method provides basic functionality but does not check the exit status or log any messages; see `pysys.basetest.BaseTest.waitProcess` for a wrapper that adds additional functionality. Note that this method will not terminate the process if the timeout is exceeded. :param timeout: The timeout to wait in seconds, for example ``timeout=TIMEOUTS['WaitForProcess']``. :raise pysys.exceptions.ProcessTimeout: Raised if the timeout is exceeded. """ assert timeout > 0, 'timeout must always be specified' startTime = time.time() log.debug("Waiting up to %d secs for process %r", timeout, self) while self.running(): currentTime = time.time() if currentTime > startTime + timeout: raise ProcessTimeout('Waiting for completion of %r timed out after %d seconds'%(self, int(timeout))) time.sleep(0.05)
[docs] def start(self): """Start a process using the runtime parameters set at instantiation. @raise pysys.exceptions.ProcessError: Raised if there is an error creating the process @raise pysys.exceptions.ProcessTimeout: Raised in the process timed out (foreground process only) """ self._outQueue = None # always reset if self.workingDir and not os.path.isdir(self.workingDir): raise Exception('Cannot start process %s as workingDir "%s" does not exist'% (self, self.workingDir)) if self.state == FOREGROUND: self.startBackgroundProcess() self.wait(self.timeout) else: self.startBackgroundProcess()