#!/usr/bin/env python3
# Copyright(c) 2007, 2013 Progress Software Corporation (PSC). All rights
# Copyright (c) 2013, 2015-2016,2018 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
""" Contains PySys extensions for starting and interacting with IAF processes.
"""
import sys, os, string, logging, socket, copy
from pysys import log
from pysys.constants import *
from pysys.exceptions import *
from pysys.utils.filereplace import replace as rep
from apama.common import ApamaServerProcess, _allocateUniqueProcessStdOutErr
from apama.common import XArgsHolder
from apama.common import stringToUnicode
from xml.dom.minidom import getDOMImplementation
[docs]class IAFHelper(ApamaServerProcess):
"""Helper class for the Software AG Apama Integration Adapter Framework (IAF).
The IAF Helper class has been designed for use as an extension module to the PySys System Test
Framework, offering the ability to configure, start and interact with an instance of the IAF. The usage
pattern of the class is to create an instance per IAF, and to then make method calls onto the instance
to perform operations such as to start the component, request reload of the configuration file, perform
management operationes etc. For example::
iaf = IAFHelper(self, config="iaf-config.xml")
iaf.start(logfile="iaf.log")
iaf.client(reload=True)
Process related methods of the class declare a method signature which includes named parameters for the
most frequently used options to the method. They also declare the **xargs parameter to allow passing in of
additional supported arguments to the process. The additional arguments that are currently supported via
**xargs are::
workingDir: The default value for the working directory of a process
state: The default state of the process (pysys.constants.BACKGROUND | pysys.constants.FOREGROUND)
timeout: The default value of the process timeout
stdout: The default value of the process stdout
stderr: The default value of the process stderr
arguments: List of extra arguments to be passed to the process
This means that legitimate calls to the start method include::
iaf.start(logfile="iaf.log")
iaf.start(logfile="iaf.log", stdout="correlator1.out")
iaf.start(state=FOREGROUND, timeout=5)
@ivar parent: Reference to the PySys testcase instantiating this class instance
@type parent: pysys.basetest
@ivar port: Port used for starting and interaction with the Event Correlator
@type port: integer
@ivar host: Hostname for interaction with a remote Event Correlator
@type host: string
@ivar environ: The environment for running the IAF
@type environ: dictionary
"""
[docs] def __init__(self, parent, port=None, host=None, name='iaf'):
"""Create an instance of the IAFHelper class.
If no port parameter is used in the argument list an available port will be dynamically found
from the OS and used for starting the IAF, and performing all operations against it. The host
parameter is only used to perform operations against a remote IAF started external to the PySys
framework - the class does not support the starting of an IAF remote to the localhost.
@param parent: Reference to the parent PySys testcase
@param port: The port used for starting and interacting with the IAF
@param host: The hostname used for interaction with a remote IAF
@param name: A display name for this process (default is "iaf"), also used for the default
stdout/err filenames.
"""
ApamaServerProcess.__init__(self, parent, name=name, port=port, host=host)
[docs] def addToClassPath(self, path):
"""Add the supplied path to the APAMA_IAF_CLASSPATH environment variable for starting this IAF instance.
"""
if "APAMA_IAF_CLASSPATH" in self.environ:
self.environ["APAMA_IAF_CLASSPATH"] = r"%s%s%s" % (self.environ["APAMA_IAF_CLASSPATH"], ENVSEPERATOR, os.path.normpath(path))
else:
self.environ["APAMA_IAF_CLASSPATH"] = os.path.normpath(path)
[docs] def addToPath(self, path):
"""Add the supplied path to the PATH (win32) or LD_LIBRARY_PATH (unix) environment variable for starting this IAF instance.
"""
if PLATFORM in [ "sunos", "linux" ]: key = "LD_LIBRARY_PATH"
else: key = "PATH"
if key in self.environ:
self.environ[key] = r"%s%s%s" % (self.environ[key], ENVSEPERATOR, os.path.normpath(path))
else:
self.environ[key] = os.path.normpath(path)
[docs] def start(self, configname, configdir=None, replace={}, logfile=None, verbosity=None, waitForServerUp=True, **xargs):
"""Start the IAF.
Start an IAF using the supplied configuration file. If the C{configdir} argument is not
supplied, the location of the configuration file defaults to the testcase input
directory. The configuration file can first be tailored to replaced token values within
the file with values required at run time using the C{replace} argument. For replace of the
form ::
replace = {"@logs_dir@":"/var/tmp/logs"}
any tokens in the coniguration file directly matching @logs_dir@ will be replaced with the
value "/var/tmp/logs". Note that multiple token value pairs can be supplied in the replace
dictionary.
@param configname: The IAF configuration file or template name
@param configdir: The directory containing the IAF configuration file or template
@param logfile: Name of the IAF log file
@param verbosity: The verbosity level of the IAF logging
@param replace: A dictionary of tokens / values to be replaced in the configuration file
@param waitForServerUp: Set to False to disable automatically waiting until the component is ready
@param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'iaf')
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, self.name)
if not configdir: configdir = self.parent.input
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, state=BACKGROUND, stdout=dstdout, stderr=dstderr, timeout=TIMEOUTS['WaitForSocket'], project=self.parent.project)
# tailor the configuration file
output = os.path.join(self.parent.output, configname)
rep(os.path.join(configdir, configname), output, replace, marker='')
# set the arguments to the process
arguments = []
arguments.extend(["--name", self.name, "-p", "%d" % self.port])
if logfile: arguments.extend(["-f", logfile])
if verbosity: arguments.extend(["-l", verbosity])
if xargs.arguments: arguments.extend(xargs.arguments)
arguments.append(os.path.join(self.parent.output, configname))
# start the process - unicoding environment needs moving into the PySys framework
env = {}
for key in self.environ: env[stringToUnicode(key)] = stringToUnicode(self.environ[key])
self.process = None
try:
hprocess = self.parent.startProcess(command, arguments, env, displayName=str(self), **xargs.kwargs)
self.process = hprocess
if waitForServerUp:
self.waitForComponentUp(timeout=xargs.timeout)
except Exception:
for f in [logfile, xargs.stdout, xargs.stderr]:
if self.parent.logFileContents(f, includes=[' ERROR .*', ' FATAL .*'], tail=True): break
raise
return hprocess
[docs] def client(self, reload=False, suspend=False, resume=False, **xargs):
"""Perform client operations against the IAF (reload, suspend, resume).
Runs as a foreground process by default.
@param reload: Request reload of the IAF configuration file
@param suspend: Request the IAF to suspend event sending
@param resume: Request the IAF to resume event sending
@param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
"""
# set the command and display name
command = os.path.join(self.parent.project.APAMA_HOME, 'bin', 'iaf_client')
displayName = "iaf_client <%s>"%self.name
# set the default stdout and stderr
dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, 'iafclient')
# transform xargs into an instance of the xargs holder class
xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, project=self.parent.project)
# set the arguments to the process
arguments = []
arguments.extend(["-p", "%d" % self.port])
if self.host: arguments.extend(["-n", self.host])
if reload: arguments.append("--reload")
elif suspend: arguments.append("--suspend")
elif resume: arguments.append("--resume")
if xargs.arguments: arguments.extend(xargs.arguments)
# start the process
return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
[docs] def waitForIAFUp(self, *args, **kwargs):
"""Block until the IAF declares itself to be ready for processing.
@deprecated: Use waitForComponentUp instead.
"""
self.waitForComponentUp(*args, **kwargs)