Source code for apama.iaf

#!/usr/bin/env python3
# Copyright(c) 2007, 2013 Progress Software Corporation (PSC).  All rights
# Copyright (c) 2013, 2015-2016,2018 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

""" Contains PySys extensions for starting and interacting with IAF processes. 
"""

import sys, os, string, logging, socket, copy

from pysys import log
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.filereplace import replace as rep

from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
from apama.common import XArgsHolder
from apama.common import stringToUnicode
from xml.dom.minidom import getDOMImplementation


[docs]class IAFHelper(ApamaServerProcess): """Helper class for the Software AG Apama Integration Adapter Framework (IAF). The IAF Helper class has been designed for use as an extension module to the PySys System Test Framework, offering the ability to configure, start and interact with an instance of the IAF. The usage pattern of the class is to create an instance per IAF, and to then make method calls onto the instance to perform operations such as to start the component, request reload of the configuration file, perform management operationes etc. For example:: iaf = IAFHelper(self, config="iaf-config.xml") iaf.start(logfile="iaf.log") iaf.client(reload=True) Process related methods of the class declare a method signature which includes named parameters for the most frequently used options to the method. They also declare the **xargs parameter to allow passing in of additional supported arguments to the process. The additional arguments that are currently supported via **xargs are:: workingDir: The default value for the working directory of a process state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND) timeout: The default value of the process timeout stdout: The default value of the process stdout stderr: The default value of the process stderr arguments: List of extra arguments to be passed to the process This means that legitimate calls to the start method include:: iaf.start(logfile="iaf.log") iaf.start(logfile="iaf.log", stdout="correlator1.out") iaf.start(state=FOREGROUND, timeout=5) @ivar parent: Reference to the PySys testcase instantiating this class instance @type parent: pysys.basetest @ivar port: Port used for starting and interaction with the Event Correlator @type port: integer @ivar host: Hostname for interaction with a remote Event Correlator @type host: string @ivar environ: The environment for running the IAF @type environ: dictionary """
[docs] def __init__(self, parent, port=None, host=None, name='iaf'): """Create an instance of the IAFHelper class. If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the IAF, and performing all operations against it. The host parameter is only used to perform operations against a remote IAF started external to the PySys framework - the class does not support the starting of an IAF remote to the localhost. @param parent: Reference to the parent PySys testcase @param port: The port used for starting and interacting with the IAF @param host: The hostname used for interaction with a remote IAF @param name: A display name for this process (default is "iaf"), also used for the default stdout/err filenames. """ ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host)
[docs] def addToClassPath(self, path): """Add the supplied path to the APAMA_IAF_CLASSPATH environment variable for starting this IAF instance. """ if "APAMA_IAF_CLASSPATH" in self.environ: self.environ["APAMA_IAF_CLASSPATH"] = r"%s%s%s" % (self.environ["APAMA_IAF_CLASSPATH"], ENVSEPERATOR, os.path.normpath(path)) else: self.environ["APAMA_IAF_CLASSPATH"] = os.path.normpath(path)
[docs] def addToPath(self, path): """Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this IAF instance. """ if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH" else: key = "PATH" if key in self.environ: self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path)) else: self.environ[key] = os.path.normpath(path)
[docs] def start(self, configname, configdir=None, replace={}, logfile=None, verbosity=None, waitForServerUp=True, **xargs): """Start the IAF. Start an IAF using the supplied configuration file. If the C{configdir} argument is not supplied, the location of the configuration file defaults to the testcase input directory. The configuration file can first be tailored to replaced token values within the file with values required at run time using the C{replace} argument. For replace of the form :: replace = {"@logs_dir@":"/var/tmp/logs"} any tokens in the coniguration file directly matching @logs_dir@ will be replaced with the value "/var/tmp/logs". Note that multiple token value pairs can be supplied in the replace dictionary. @param configname: The IAF configuration file or template name @param configdir: The directory containing the IAF configuration file or template @param logfile: Name of the IAF log file @param verbosity: The verbosity level of the IAF logging @param replace: A dictionary of tokens / values to be replaced in the configuration file @param waitForServerUp: Set to False to disable automatically waiting until the component is ready @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'iaf') dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name) if not configdir: configdir = self.parent.input # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project) # tailor the configuration file output = os.path.join(self.parent.output, configname) rep(os.path.join(configdir, configname), output, replace, marker='') # set the arguments to the process arguments = [] arguments.extend(["--name", self.name, "-p", "%d" % self.port]) if logfile: arguments.extend(["-f", logfile]) if verbosity: arguments.extend(["-l", verbosity]) if xargs.arguments: arguments.extend(xargs.arguments) arguments.append(os.path.join(self.parent.output, configname)) # start the process - unicoding environment needs moving into the PySys framework env = {} for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key]) self.process = None try: hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs) self.process = hprocess if waitForServerUp: self.waitForComponentUp(timeout=xargs.timeout) except Exception: for f in [logfile, xargs.stdout, xargs.stderr]: if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*'], tail=True): break raise return hprocess
[docs] def client(self, reload=False, suspend=False, resume=False, **xargs): """Perform client operations against the IAF (reload, suspend, resume). Runs as a foreground process by default. @param reload: Request reload of the IAF configuration file @param suspend: Request the IAF to suspend event sending @param resume: Request the IAF to resume event sending @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir """ # set the command and display name command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'iaf_client') displayName = "iaf_client <%s>"%self.name # set the default stdout and stderr dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'iafclient') # transform xargs into an instance of the xargs holder class xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project) # set the arguments to the process arguments = [] arguments.extend(["-p", "%d" % self.port]) if self.host: arguments.extend(["-n", self.host]) if reload: arguments.append("--reload") elif suspend: arguments.append("--suspend") elif resume: arguments.append("--resume") if xargs.arguments: arguments.extend(xargs.arguments) # start the process return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def waitForIAFUp(self, *args, **kwargs): """Block until the IAF declares itself to be ready for processing. @deprecated: Use waitForComponentUp instead. """ self.waitForComponentUp(*args, **kwargs)