Source code for apama.common

#!/usr/bin/env python3
# Copyright(c) 2007,2013 Progress Software Corporation (PSC).  All rights reserved
# Copyright (c) 2013,2015-2021 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
#
# This module contains internal implementation utility code for pysys-apama, 
# which is unsupported and may be removed at any time. 
#
"""
Contains PySys extensions required by IAF and correlator support. 

"""

# Deprecated and/or not in public API: XArgsHolder, _initProject, _allocateUniqueProcessStdOutErr, stringToUnicode

import time
import os.path

import pysys
from pysys import log
from pysys.constants import *
from pysys.exceptions import *
from pysys.process.helper import ProcessWrapper # only used when we need special handling, startProcess is usually better

def _initProject(project):
	"""
	Internal-only, do not use.
	"""
	assert project
	try:
		assert project.APAMA_HOME
		assert project.APAMA_WORK
	except Exception:
		raise Exception("Not running inside an Apama environment. APAMA_HOME and APAMA_WORK not set")

__APAMA_COMMON_JRE = None
def _getApamaCommonJRE(project):
	# Internal-only, do not use. Despite the name, this is usually the path to a JDK (NOT just a JRE)
	global __APAMA_COMMON_JRE
	if __APAMA_COMMON_JRE is None:	
		__APAMA_COMMON_JRE = os.path.normpath(
			project.getProperty('APAMA_COMMON_JRE', '') or # setting this project property is optional, if it's not set we'll figure it out
			os.getenv('APAMA_JRE', None) or # this is the "modern" (post Apama 4.0) name for the JRE property
			os.path.join(self.parent.project.APAMA_HOME, "..", "jvm", "jvm")
			)
		# Java 8 JDK has a jre/ subdir whereas Java 9+ does not; the following allows Apama 10.7 and projects created using it and earlier versions to 
		# work against new JDKs
		if __APAMA_COMMON_JRE.endswith('jre') and (not os.path.exists(__APAMA_COMMON_JRE)) and os.path.exists(os.path.dirname(__APAMA_COMMON_JRE)): 
			log.warning('The APAMA_COMMON_JRE property value contains a /jre suffix which does not exist in this version "%s"; please update your pysysproject.xml', __APAMA_COMMON_JRE)
			__APAMA_COMMON_JRE = os.path.dirname(__APAMA_COMMON_JRE)
	return __APAMA_COMMON_JRE

[docs]class ApamaServerProcess(object): """Abstract parent helper class for Apama server processes. :ivar ~.parent: Reference to the PySys pysys.basetest.BaseTest testcase (or pysys.process.user.ProcessUser) instantiating this instance. :ivar int ~.port: Port used for starting and interaction with the process. If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the process, and performing all operations against it. :ivar str ~.host: Hostname for interaction with a remote process. The host parameter is only used to perform operations against a remote process started externally to the PySys framework - the class does not support the starting of a remote process. :ivar dict(str,str) ~.environ: The environment for running the process """ def __init__(self, parent, name, port=None, host=None): self.parent = parent self.process = None # only ever valid if started locally _initProject(self.parent.project) def cleanup(): try: if self.process and self.process.running(): # only if started by us self.shutdown(ignoreExitStatus=True) except Exception as e: # for now don't actually cause a failure if this happens; could add that as an option later if it's useful self.parent.log.warning('Failed to shutdown %s: %s', self, e) if self.parent.project.getProperty('shutdownApamaComponentsAfterTest', True): parent.addCleanupFunction(cleanup) self.port = int(port) if port else port self.host = host if self.port == None: self.port = parent.getNextAvailableTCPPort() if self.host == None: self.host = "localhost" self.environ = {} self.log = self.parent.log for key in os.environ: self.environ[key] = os.environ[key] assert self.parent.project, 'project is not set - PySys environment not configured correctly' _initProject(self.parent.project) self.name = name self.componentType = 'unknown'
[docs] def waitForLogGrep(self, expr, errorExpr=['(ERROR|FATAL|Failed to parse) .*'], ignores=None, mappers=None, **kwargs): """ Wait until this component's ``logfile`` contains the specified regular expression, such as a message indicating that the test has completed. For example:: correlator.waitForLogGrep(r'INFO .* Test completed') This is a wrapper around `pysys.basetest.BaseTest.waitForGrep` that adds functionality specific to Apama components, such as aborting early (with a clear outcome reason) if an error message is found in the log file or the process dies while waiting (with mappers to convert multi-line stack traces to a self-contained message), and always using the UTF-8 encoding to read the log file. Any of the standard ``waitForGrep`` keywords such as ``ignores=``, ``timeout=`` may also be pass in as keyword arguments. :param str expr: The regular expression to search for in the log file. Remember to escape characters such as ``[]()\\``. :param str condition: The condition to be met for the number of lines matching the regular expression; by default we wait until there is at least one occurrence. :param int timeout: The number of seconds to wait for the regular expression before giving up and aborting the test with `pysys.constants.TIMEDOUT` (unless abortOnError=False in which case execution will continue). :param list[str] ignores: A list of regular expressions used to identify lines in the file which should be ignored when matching both ``expr`` and ``errorExpr``. By default this is taken from the value of the `apama.testplugin.ApamaPlugin.defaultLogIgnores`. :param List[callable[str]->str] mappers: A list of filter functions that will be used to pre-process each line of the log. Unless explicitly overridden, mappers for EPL and Java will be used ( `apama.testplugin.ApamaPlugin.JoinEPLStackLines` and `pysys.mappers.JoinLines.JavaStackTrace`). :return: A list of the matches; see `pysys.basetest.BaseTest.waitForGrep` for details. .. versionadded:: 10.11.0 """ assert self.logfile assert hasattr(self.parent, 'apama'), 'This method can only be used if your project is configured with the ApamaTestPlugin' if ignores is None: ignores = self.parent.apama.defaultLogIgnores from apama.testplugin import ApamaPlugin if mappers is None: mappers=[ApamaPlugin.JoinEPLStackLines(), pysys.mappers.JoinLines.JavaStackTrace()] return self.parent.waitForGrep(self.logfile, expr, errorExpr=errorExpr, mappers=mappers, ignores=ignores, process=self, encoding='utf-8', **kwargs)
[docs] def waitForComponentUp(self, timeout=TIMEOUTS['WaitForSocket'], **xargs): """Block until the the component declares itself to be ready for processing. """ # set the command and display name command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'component_management') # set the default stdout and stderr stdout, stderr = self.parent.allocateUniqueStdOutErr('waitForComponentUp_%s'%self.name) # set the arguments to the process arguments = [] #arguments.append("-v") arguments.append("-w") arguments.extend(["-p", "%d" % self.port]) arguments.extend(["-n", self.host]) waitprocess = ProcessWrapper(command, arguments, self.environ, self.parent.output, BACKGROUND, timeout, stdout, stderr, displayName='waitForComponentUp %s'%self) waitprocess.start() try: endtime = time.time()+timeout while time.time() < endtime and waitprocess.running(): # if the component was started locally, then early-out if it has failed if self.process and not self.process.running(): self.parent.addOutcome(BLOCKED, '%s failed with exit code %d in waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError) return # in case it doesn't abort time.sleep(0.05) if waitprocess.running(): self.parent.addOutcome(TIMEDOUT, 'Timed out waiting for %s to come up (after %d secs)'%(self, timeout), abortOnError=self.parent.defaultAbortOnError) elif waitprocess.exitStatus != 0: # shouldn't really happen self.parent.addOutcome(BLOCKED, 'component_management failed while waiting for %s to come up, with exit status %d'% (self, waitprocess.exitStatus), abortOnError=self.parent.defaultAbortOnError) else: # a short wait is needed to detect port in use startup failures where the component is already running on the port, # to give time for the process to try and fail to bind the port. # set it to 0 by default to keep testcases running as fast as possible, but # may be set to a higher number for deployments sltime = float(getattr(self.parent.project, 'APAMA_START_COMPONENT_PORT_IN_USE_SLEEP_SECS', '0')) if sltime > 0: time.sleep(sltime) # in case a different component was started on this port from the local one we wanted if self.process and not self.process.running(): self.parent.addOutcome(BLOCKED, '%s failed with exit code %d during waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError) self.parent.log.info(" Component %s is now running", self) finally: if waitprocess.running(): waitprocess.stop()
[docs] def shutdown(self, message='Shutdown requested by test', timeout=60, **args): """ Requests a clean shutdown of the component. If it was started by this object, also waits for the process to terminate, and silently ignores requests to shutdown if the process was already stopped. """ if self.process and not self.process.running(): return self.manage(arguments=['--shutdown', message], timeout=timeout, displayName='shutdown', **args) if self.process: self.process.wait(timeout=timeout)
[docs] def running(self): """ Returns True if this has a local process that is running, or False if it has stopped, or was not started by this object. """ return self.process and self.process.running()
def __repr__(self): return '%s<%s%s:%d>'%( self.name, ('pid %s on '%self.process.pid) if (self.process and self.process.pid) else '', self.host, self.port )
[docs] def manage(self, arguments=[], displayName="manage", **xargs): """Execute component_management operations against the process. :param arguments: The arguments to be passed to component_management :param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'component_management') assert not isinstance(arguments, str) # cannot pass a string in here dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, displayName) # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, arguments=arguments, project=self.parent.project) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if xargs.arguments: arguments.extend(xargs.arguments) if displayName == 'manage': displayName='%s <%s> [%s]'%(displayName, self.name, ' '.join(xargs.arguments)) if '-s' in xargs.arguments or '--shutdown' in xargs.arguments: displayName = 'manage <%s> --shutdown'%self.name # keep it concise else: displayName = '%s <%s>'%(displayName, self.name) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
def _allocateUniqueProcessStdOutErr(parent, processKey): """ Allocate filenames of the form processKey.out[.n] (similarly for .err) for a process that is about to be started, such that the names are not repeated within the specified parent's lifetime. Not for customer use, and legacy only - the pysys allocateUniqueStdOutErr method is recommended instead. Returns a tuple of (stdout, stderr). """ # NB: use this only for existing methods (backwards compat) - for anything new the # pysys 1.1 parent.allocateUniqueStdOutErr method is better, but as it uses a different format # can't use it out fo the box if not hasattr(parent, '_apama_uniqueProcessKeys'): parent._apama_uniqueProcessKeys = {} newval = parent._apama_uniqueProcessKeys.get(processKey, -1)+1 parent._apama_uniqueProcessKeys[processKey] = newval suffix = '.%d'%newval if newval > 0 else '' return ( os.path.join(parent.output, processKey+'.out'+suffix), os.path.join(parent.output, processKey+'.err'+suffix), ) def stringToUnicode(s): """ :deprecated: Internal helper function: do not use, will be removed in a future release. :meta private: Converts a unicode string or a utf-8 bit string into a unicode string. """ return s class XArgsHolder: """ :deprecated: Internal helper class: do not use, will be removed in a future release. :meta private: Class to store all supported xargs method arguments used in the helper classes. Methods in the Apama helper classes define process related methods with a signature including named parameters for the most commonly used options to the process, and an **xargs parameter to allow passing through of additional supported named parameters, e.g. workingDir, state, timeout, stdout, stderr and arguments. The XArgsHolder class takes the **xargs parameter from a method call (which is treated by python as a dictionary of name value pairs) and default values for the workingDir, state, timeout, stdout, stderr and arguments; these are used to set data attributes to the class instance with the default values. The class then iterates over the **xargs and over-writes the default values if they exist in the parameter. This allows a user of the class to create an instance to hold the additional arguments with default values in the first case, but for these to be replaced if an alternative value is supplied via **xargs, e.g. the user of the method wants to explicitly set the sdtout etc. The kwargs member contains a dictionary that can be passed through to startProcess using **kwargs, containing every setting except for arguments which should always be handled directly. Typical usage would be startProcess(arguments=xargs.arguments, **xargs.kwargs) """ def __init__(self, xargs, workingDir=None, state=FOREGROUND, timeout=TIMEOUTS['WaitForProcess'], stdout=None, stderr=None, arguments=[], ignoreExitStatus=None, project=None): """Create an instance of the XArgsHolder class. :param xargs: The variable argument list passed into the method, which override the defaults provided by the other arguments :param workingDir: The default value for the working directory of a process :param state: The default state of the process (L{pysys.constants.BACKGROUND} | L{pysys.constants.FOREGROUND}) :param timeout: The default value of the process timeout :param stdout: The default value of the process stdout :param stderr: The default value of the process stderr :param arguments: Extra command line arguments to be passed to the process. :param ignoreExitStatus: Set to True to change default behaviour for this process to prevent non-zero exit code being treated as a test failure. The default is normally False. The project setting defaultApamaIgnoreExitStatus can be used to control this globally. """ # this allows arbitrary extra args to be passed to startProcess e.g. abortOnError, anything we add in future self.kwargs = dict(xargs) # set kwargs and members for known keywords. Give priority to xargs provided if specified. self.workingDir = self.kwargs['workingDir'] = xargs.get('workingDir', workingDir) self.state = self.kwargs['state'] = xargs.get('state', state) self.timeout = self.kwargs['timeout'] = xargs.get('timeout', timeout) if 'stdouterr' in xargs: if 'stdout' in xargs or 'stderr' in xargs: raise Exception('Cannot specify both stdouterr= and stdout/stderr= arguments at the same time: %s'%xargs) # if user specified stdouterr, don't override it with the defaults set in the xargs self.stdout = self.stderr = None else: self.stdout = self.kwargs['stdout'] = xargs.get('stdout', stdout) self.stderr = self.kwargs['stderr'] = xargs.get('stderr', stderr) if ignoreExitStatus==None: ignoreExitStatus = project.defaultApamaIgnoreExitStatus.lower()=='true' if project and hasattr(project, 'defaultApamaIgnoreExitStatus') else False self.ignoreExitStatus = self.kwargs['ignoreExitStatus'] = xargs.get('ignoreExitStatus', ignoreExitStatus) # arguments should be handled differently, need to remove from kwargs since it's common for callers to # want to add to it and manipulate it directly self.arguments = self.kwargs.pop('arguments', arguments) # sanity checking if self.arguments == None: self.arguments = [] # ensure it is always a list if isinstance(self.arguments, str): raise Exception('arguments must be a list not a string')