Source code for apama.iaf

#!/usr/bin/env python3
# Copyright(c) 2007, 2013 Progress Software Corporation (PSC).  All rights
# Copyright (c) 2013, 2015-2016,2018,2019-2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains PySys extensions for starting and interacting with IAF processes. 
"""

import sys, os, string, logging, socket, copy

from pysys import log
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.filereplace import replace as rep

from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
from apama.common import XArgsHolder
from apama.common import stringToUnicode
from xml.dom.minidom import getDOMImplementation


[docs]class IAFHelper(ApamaServerProcess): """Helper class for the Software AG Apama Integration Adapter Framework (IAF). The IAF Helper class has been designed for use as an extension module to the PySys System Test Framework, offering the ability to configure, start and interact with an instance of the IAF. The usage pattern of the class is to create an instance per IAF, and to then make method calls onto the instance to perform operations such as to start the component, request reload of the configuration file, perform management operationes etc. For example:: iaf = IAFHelper(self, config="iaf-config.xml") iaf.start(logfile="iaf.log") iaf.client(reload=True) Process related methods of the class declare a method signature which includes named parameters for the most frequently used options to the method. They also declare the ``**xargs`` parameter to allow passing in of additional supported arguments to the process. The additional arguments that are currently supported via ``**xargs`` are: - workingDir: The default value for the working directory of a process - state: The default state of the process (`pysys.constants.BACKGROUND` | `pysys.constants.FOREGROUND`) - timeout: The default value of the process timeout - stdout: The default value of the process stdout - stderr: The default value of the process stderr - arguments: List of extra arguments to be passed to the process This means that legitimate calls to the start method include:: iaf.start(logfile="iaf.log") iaf.start(logfile="iaf.log", stdout="iaf1.out") iaf.start(state=FOREGROUND, timeout=5*60) :ivar ~.parent: Reference to the PySys pysys.basetest.BaseTest testcase (or pysys.process.user.ProcessUser) instantiating this instance. :ivar int ~.port: Port used for starting and interaction with the process. If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the process, and performing all operations against it. :ivar str ~.host: Hostname for interaction with a remote process. The host parameter is only used to perform operations against a remote process started externally to the PySys framework - the class does not support the starting of a remote process. :ivar dict(str,str) ~.environ: The environment for running the process :ivar str ~.name: A display name for this process (default is "iaf"), also used for the default stdout/err filenames. :param ~.startArgs: Default values can be provided here for any keyword arguments that are supported by start(). """ def __init__(self, parent, port=None, host=None, name='iaf', **startArgs): ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host) self.componentType = 'iaf' self.__startArgs = startArgs
[docs] def addToClassPath(self, path): """Add the supplied path to the APAMA_IAF_CLASSPATH environment variable for starting this IAF instance. """ if "APAMA_IAF_CLASSPATH" in self.environ: self.environ["APAMA_IAF_CLASSPATH"] = r"%s%s%s" % (self.environ["APAMA_IAF_CLASSPATH"], os.pathsep, os.path.normpath(path)) else: self.environ["APAMA_IAF_CLASSPATH"] = os.path.normpath(path)
[docs] def addToPath(self, path): """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this IAF instance. """ if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" else: key = "PATH" if key in self.environ: self.environ[key] = r"%s%s%s" % (self.environ[key], os.pathsep, os.path.normpath(path)) else: self.environ[key] = os.path.normpath(path)
[docs] def start(self, configname=None, configdir=None, replace=None, logfile=None, verbosity=None, environ=None, waitForServerUp=None, **xargs): """Start the IAF. Note that default values for any of this method's parameters can be passed to the constructor when this object is instantiated. Start an IAF using the supplied configuration file. If the ``configdir`` argument is not supplied, the location of the configuration file defaults to the testcase input directory. The configuration file can first be tailored to replaced token values within the file with values required at run time using the``replace`` argument. For replace of the form :: replace = {"@logs_dir@":"/var/tmp/logs"} any tokens in the coniguration file directly matching @logs_dir@ will be replaced with the value "/var/tmp/logs". Note that multiple token value pairs can be supplied in the replace dictionary. :param str configname: The IAF configuration file or template name :param str configdir: The directory containing the IAF configuration file or template :param str logfile: Name of the IAF log file :param str verbosity: The verbosity level of the IAF logging :param dict[str,str] environ: Map of environment variables to override. :param dict[str,str] replace: A dictionary of tokens / values to be replaced in the configuration file :param bool waitForServerUp: Set to False to disable automatically waiting until the component is ready. :param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'iaf') dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name) # direct arguments take precedence, but startargs can be used for anything else # note the different logic required for booleans/lists/others startargs = copy.deepcopy(self.__startArgs) def argsExtractor(argName, value, defaultValue): tempValue = startargs.pop(argName, None) if value is not None: return value return defaultValue if tempValue is None else tempValue configname = argsExtractor('configname',configname,None) configdir = argsExtractor('configdir',configdir,self.parent.input) replace = argsExtractor('replace',replace,{}) self.logfile = logfile = argsExtractor('logfile',logfile,None) verbosity = argsExtractor('verbosity',verbosity,None) environ = argsExtractor('environ',environ,None) waitForServerUp = argsExtractor('waitForServerUp',waitForServerUp,True) xargs = dict(list(startargs.items()) + list(xargs.items())) # xargs takes precedence assert configname, 'configname must be specified' # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project) # tailor the configuration file output = os.path.join(self.parent.output, configname) rep(os.path.join(configdir, configname), output, replace, marker='') # set the arguments to the process arguments = [] arguments.extend(["--name", self.name, "-p", "%d" % self.port]) if logfile: arguments.extend(["-f", logfile]) if verbosity: arguments.extend(["-l", verbosity]) if xargs.arguments: arguments.extend(xargs.arguments) arguments.append(os.path.join(self.parent.output, configname)) # start the process - unicoding environment needs moving into the PySys framework env = {} for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) if environ: for key in environ: env[stringToUnicode(key)] = stringToUnicode(environ[key]) self.process = None try: hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), onError=lambda process: None, **xargs.kwargs) self.process = hprocess if waitForServerUp: self.waitForComponentUp(timeout=xargs.timeout) except Exception: for f in [logfile, xargs.stdout, xargs.stderr]: if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*'], tail=True): break raise return hprocess
[docs] def client(self, reload=False, suspend=False, resume=False, **xargs): """Perform client operations against the IAF (reload, suspend, resume). Runs as a foreground process by default. :param reload: Request reload of the IAF configuration file :param suspend: Request the IAF to suspend event sending :param resume: Request the IAF to resume event sending :param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'iaf_client') displayName = "iaf_client <%s>"%self.name # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'iafclient') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if reload: arguments.append("--reload") elif suspend: arguments.append("--suspend") elif resume: arguments.append("--resume") if xargs.arguments: arguments.extend(xargs.arguments) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)