#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC). All rights reserved
# Copyright (c) 2013,2015-2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
#
# This module contains internal implementation utility code for pysys-apama,
# which is unsupported and may be removed at any time.
#
"""
Contains PySys extensions required by IAF and correlator support.
"""
# Deprecated and/or not in publc API: XArgsHolder, _initProject, _allocateUniqueProcessStdOutErr, stringToUnicode
import time
from pysys import log
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.pycompat import *
from pysys.process.helper import ProcessWrapper # only used when we need special handling, startProcess is usually better
def _initProject(project):
"""
Internal-only, do not use.
"""
try:
assert project.APAMA_HOME
assert project.APAMA_WORK
except Exception:
raise Exception("Not running inside an Apama environment. APAMA_HOME and APAMA_WORK not set")
[docs]class ApamaServerProcess(object):
"""Abstract parent helper class for Apama server processes.
:ivar ~.parent: Reference to the PySys pysys.basetest.BaseTest testcase (or pysys.process.user.ProcessUser) instantiating this instance.
:ivar int ~.port: Port used for starting and interaction with the process. If no port parameter is used in the
argument list an available port will be dynamically found from
the OS and used for starting the process, and performing all operations against it.
:ivar str ~.host: Hostname for interaction with a remote process. The host
parameter is only used to perform operations against a remote process started externally to the
PySys framework - the class does not support the starting of a remote process.
:ivar dict(str,str) ~.environ: The environment for running the process
"""
def __init__(self, parent, name, port=None, host=None):
self.parent = parent
self.process = None # only ever valid if started locally
_initProject(self.parent.project)
def cleanup():
try:
if self.process and self.process.running(): # only if started by us
self.shutdown(ignoreExitStatus=True)
except Exception as e:
# for now don't actually cause a failure if this happens; could add that as an option later if it's useful
self.parent.log.warning('Failed to shutdown %s: %s', self, e)
if self.parent.project.getProperty('shutdownApamaComponentsAfterTest', True):
parent.addCleanupFunction(cleanup)
self.port = int(port) if port else port
self.host = host
if self.port == None: self.port = parent.getNextAvailableTCPPort()
if self.host == None: self.host = "localhost"
self.environ = {}
self.log = self.parent.log
for key in os.environ: self.environ[stringToUnicode(key)] = stringToUnicode(os.environ[key])
assert self.parent.project, 'project is not set - PySys environment not configured correctly'
_initProject(self.parent.project)
self.name = name
self.componentType = 'unknown'
[docs] def waitForComponentUp(self, timeout=TIMEOUTS['WaitForSocket'], **xargs):
"""Block until the the component declares itself to be ready for processing.
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'component_management')
# set the default stdout and stderr
stdout, stderr = self.parent.allocateUniqueStdOutErr('waitForComponentUp_%s'%self.name)
# set the arguments to the process
arguments = []
#arguments.append("-v")
arguments.append("-w")
arguments.extend(["-p", "%d" % self.port])
arguments.extend(["-n", self.host])
waitprocess = ProcessWrapper(command, arguments, self.environ, self.parent.output, BACKGROUND, timeout, stdout, stderr, displayName='waitForComponentUp %s'%self)
waitprocess.start()
try:
endtime = time.time()+timeout
while time.time() < endtime and waitprocess.running():
# if the component was started locally, then early-out if it has failed
if self.process and not self.process.running():
self.parent.addOutcome(BLOCKED, '%s failed with exit code %d in waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError)
return # in case it doesn't abort
time.sleep(0.05)
if waitprocess.running():
self.parent.addOutcome(TIMEDOUT, 'Timed out waiting for %s to come up (after %d secs)'%(self, timeout), abortOnError=self.parent.defaultAbortOnError)
elif waitprocess.exitStatus != 0:
# shouldn't really happen
self.parent.addOutcome(BLOCKED, 'component_management failed while waiting for %s to come up, with exit status %d'%
(self, waitprocess.exitStatus), abortOnError=self.parent.defaultAbortOnError)
else:
# a short wait is needed to detect port in use startup failures where the component is already running on the port,
# to give time for the process to try and fail to bind the port.
# set it to 0 by default to keep testcases running as fast as possible, but
# may be set to a higher number for deployments
sltime = float(getattr(self.parent.project, 'APAMA_START_COMPONENT_PORT_IN_USE_SLEEP_SECS', '0'))
if sltime > 0:
time.sleep(sltime)
# in case a different component was started on this port from the local one we wanted
if self.process and not self.process.running():
self.parent.addOutcome(BLOCKED, '%s failed with exit code %d during waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError)
self.parent.log.info(" Component %s is now running", self)
finally:
if waitprocess.running(): waitprocess.stop()
[docs] def shutdown(self, message='Shutdown requested by test', timeout=60, **args):
""" Requests a clean shutdown of the component.
If it was started by this object, also waits for the process to
terminate, and silently ignores requests to shutdown if the process
was already stopped.
"""
if self.process and not self.process.running(): return
self.manage(arguments=['--shutdown', message], timeout=timeout, displayName='shutdown', **args)
if self.process:
self.process.wait(timeout=timeout)
[docs] def running(self):
""" Returns True if this has a local process that is running, or
False if it has stopped, or was not started by this object.
"""
return self.process and self.process.running()
def __repr__(self): return '%s<%s%s:%d>'%(
self.name,
('pid %s on '%self.process.pid) if (self.process and self.process.pid) else '',
self.host, self.port
)
[docs] def manage(self, arguments=[], displayName="manage", **xargs):
"""Execute component_management operations against the process.
:param arguments: The arguments to be passed to component_management
:param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'component_management')
assert not isstring(arguments) # cannot pass a string in here
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, displayName)
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, arguments=arguments, project=self.parent.project)
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if xargs.arguments: arguments.extend(xargs.arguments)
if displayName == 'manage':
displayName='%s <%s> [%s]'%(displayName, self.name, ' '.join(xargs.arguments))
if '-s' in xargs.arguments or '--shutdown' in xargs.arguments:
displayName = 'manage <%s> --shutdown'%self.name # keep it concise
else:
displayName = '%s <%s>'%(displayName, self.name)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
def _allocateUniqueProcessStdOutErr(parent, processKey):
""" Allocate filenames of the form processKey.out[.n] (similarly for .err)
for a process that is about to be started, such that the names are not
repeated within the specified parent's lifetime.
Not for customer use, and legacy only - the pysys allocateUniqueStdOutErr method is recommended instead.
Returns a tuple of (stdout, stderr).
"""
# NB: use this only for existing methods (backwards compat) - for anything new the
# pysys 1.1 parent.allocateUniqueStdOutErr method is better, but as it uses a different format
# can't use it out fo the box
if not hasattr(parent, '_apama_uniqueProcessKeys'): parent._apama_uniqueProcessKeys = {}
newval = parent._apama_uniqueProcessKeys.get(processKey, -1)+1
parent._apama_uniqueProcessKeys[processKey] = newval
suffix = '.%d'%newval if newval > 0 else ''
return (
os.path.join(parent.output, processKey+'.out'+suffix),
os.path.join(parent.output, processKey+'.err'+suffix),
)
def stringToUnicode(s):
"""
:deprecated: Internal helper function: do not use, will be removed in a future release.
:meta private:
Converts a unicode string or a utf-8 bit string into a unicode string.
"""
if not PY2: return s
if isinstance(s, unicode):
return s
else:
return unicode(s, "utf8")
class XArgsHolder:
"""
:deprecated: Internal helper class: do not use, will be removed in a future release.
:meta private:
Class to store all supported xargs method arguments used in the helper classes.
Methods in the Apama helper classes define process related methods with a signature including
named parameters for the most commonly used options to the process, and an **xargs parameter to allow
passing through of additional supported named parameters, e.g. workingDir, state, timeout, stdout, stderr and
arguments. The XArgsHolder class takes the **xargs parameter from a method call (which is treated by
python as a dictionary of name value pairs) and default values for the workingDir, state, timeout, stdout, stderr
and arguments; these are used to set data attributes to the class instance with the default values. The class then
iterates over the **xargs and over-writes the default values if they exist in the parameter. This allows a
user of the class to create an instance to hold the additional arguments with default values in the first
case, but for these to be replaced if an alternative value is supplied via **xargs, e.g. the user of the method wants
to explicitly set the sdtout etc.
The kwargs member contains a dictionary that can be passed through to startProcess using **kwargs,
containing every setting except for arguments which should always be handled directly.
Typical usage would be startProcess(arguments=xargs.arguments, **xargs.kwargs)
"""
def __init__(self, xargs, workingDir=None, state=FOREGROUND, timeout=TIMEOUTS['WaitForProcess'], stdout=None, stderr=None, arguments=[], ignoreExitStatus=None, project=None):
"""Create an instance of the XArgsHolder class.
:param xargs: The variable argument list passed into the method, which override the defaults provided by the other arguments
:param workingDir: The default value for the working directory of a process
:param state: The default state of the process (L{pysys.constants.BACKGROUND} | L{pysys.constants.FOREGROUND})
:param timeout: The default value of the process timeout
:param stdout: The default value of the process stdout
:param stderr: The default value of the process stderr
:param arguments: Extra command line arguments to be passed to the process.
:param ignoreExitStatus: Set to True to change default behaviour for this process to prevent
non-zero exit code being treated as a test failure. The default is normally False. The project setting
defaultApamaIgnoreExitStatus can be used to control this globally.
"""
# this allows arbitrary extra args to be passed to startProcess e.g. abortOnError, anything we add in future
self.kwargs = dict(xargs)
# set kwargs and members for known keywords. Give priority to xargs provided if specified.
self.workingDir = self.kwargs['workingDir'] = xargs.get('workingDir', workingDir)
self.state = self.kwargs['state'] = xargs.get('state', state)
self.timeout = self.kwargs['timeout'] = xargs.get('timeout', timeout)
if 'stdouterr' in xargs:
if 'stdout' in xargs or 'stderr' in xargs:
raise Exception('Cannot specify both stdouterr= and stdout/stderr= arguments at the same time: %s'%xargs)
# if user specified stdouterr, don't override it with the defaults set in the xargs
self.stdout = self.stderr = None
else:
self.stdout = self.kwargs['stdout'] = xargs.get('stdout', stdout)
self.stderr = self.kwargs['stderr'] = xargs.get('stderr', stderr)
if ignoreExitStatus==None:
ignoreExitStatus = project.defaultApamaIgnoreExitStatus.lower()=='true' if project and hasattr(project, 'defaultApamaIgnoreExitStatus') else False
self.ignoreExitStatus = self.kwargs['ignoreExitStatus'] = xargs.get('ignoreExitStatus', ignoreExitStatus)
# arguments should be handled differently, need to remove from kwargs since it's common for callers to
# want to add to it and manipulate it directly
self.arguments = self.kwargs.pop('arguments', arguments)
# sanity checking
if self.arguments == None: self.arguments = [] # ensure it is always a list
if isstring(self.arguments): raise Exception('arguments must be a list not a string')