Source code for apama.eplplugin

#  $Copyright (c) 2018-2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ 
#  Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG

"""
Contains classes used to define EPL plug-ins written in Python that run inside the Apama correlator.

EPL plug-ins must inherit from `EPLPluginBase` and have methods on them decorated
with the `EPLAction` decorator to declare the EPL type of the methods.

There are also several classes defined to pass custom EPL types from EPL to Python.

"""

from collections import namedtuple
import logging, sys, traceback, inspect

if 'sphinx' not in sys.modules:
	try:
		import correlator_internal
	except:
		raise Exception("Could not import correlator_internal. This probably means you're trying to run the plugin through python directly, which won't work. Instead, try loading it as a plugin in the correlator and calling it from EPL")

Event = namedtuple("Event", "type fields")
Event.__doc__= 'Event is a class containing type and fields.'
Event.type.__doc__='A string containing the fully-qualified name of this Event instance. For example: apama.tests.myEvent'
Event.fields.__doc__='A dictionary containing key-value pairs of field name and value.' 

Location = namedtuple("Location","x1 y1 x2 y2")
Location.__doc__= 'Location class defines a position in Apama using 2 coordinates.'
Location.x1.__doc__ = 'Float defining the x position of the first coordinate pair.'
Location.y1.__doc__ = 'Float defining the y position of the first coordinate pair.'
Location.x2.__doc__ = 'Float defining the x position of the second coordinate pair.'
Location.y2.__doc__ = 'Float defining the y position of the second coordinate pair.'

[docs]class Context(object): """ Context class represents an Apama EPL context. :param name: The string name of the context. :param contextId: The integer id of the context. """ _contextId = 0 _name = "context" def __init__(self, name, contextId) : self._name = name self._contextId = contextId
[docs] def getName(self): """ Retrieve the name of this context. :return: The string name of the context. """ return self._name
[docs] def str(self): """ Stringify the context. :return: A string representation of the context, containing the context name. """ return "Context(" + str(self._name) + ")"
[docs] def repr(self): """ Stringify this context. :return: A string representation of the contexts, containing the context name and ID. """ return "Context(" + str(self._contextId) + "," + str(self._name) + ")"
[docs] def _getContextId(self): """ Get the id of this context. :return: The integer ID of this context. """ return self._contextId
Channel = namedtuple("Channel","channel context") Channel.__doc__ = 'Channel class represents an Apama channel.' Channel.channel.__doc__ = 'A string representing the name for the channel.' Channel.context.__doc__ = 'A context object for the channel.' Any = namedtuple("Any", "type value") Any.__doc__ = 'Any class representing the any variant type in EPL.' Any.type.__doc__ = 'String name of the EPL type contained in the any, may be None if empty. Uses EPL-format type names, for example: dictionary<string, sequence<integer> >' Any.value.__doc__ = 'Contents of the any. Will be of the equivalent Python type to the EPL type named in type, or None if empty.'
[docs]class Correlator(object): """ Contains static methods to interact with the correlator directly from an EPL plug-in, such as by sending events. """
[docs] @staticmethod def sendTo(destination, event, type=None): """ Send an event from Python to a destination within the correlator. sendTo takes two or three arguments, but can only be invoked with positional arguments, not named arguments. For example:: Correlator.sendTo("channelname", "MyEvent(123, 456)") Correlator.sendTo(Channel("channelname"), {"i":123, "j":456}, type="MyEvent") Correlator.sentTo(Correlator.getCurrentContext(), Event("MyEvent", {"i":123, "j":456})) This method may block if sending to a context with a full queue. You may not call this method from the plugin module (static) code or from the plugin constructor. :param destination: Either a string representing a channel name, a Context object or a Channel object containing either a string channel name or a Context object. The event will be sent to the specified context, or any context subscribed to the specified channel. (required) :param event: Either a string in Apama event string format; a dictionary of string field names to values (in which case type must be specified); an Event object. (required) :param str type: The fully-qualified name of an EPL event type. (optional) """ correlator_internal.sendTo(destination, type, event)
[docs] @staticmethod def getCurrentContext(): """ Get the current context we are running in. :return: A `Context` object representing the current execution context, or None if called on a background thread. """ return correlator_internal.getCurrentContext()
[docs]class EPLPluginBase(object): """ This is the base class which all Python EPL plug-in instances should inherit from, and therefore defines all the ways in which the plug-in can interact with the correlator. Methods on a Python EPL plug-in class must be decorated with the `EPLAction` decorator in order to be exposed to EPL. :param init: An opaque structure which must be passed up from the plug-in constructor verbatim. """ class __LogHandler(logging.Handler): def __init__(self, prefix): logging.Handler.__init__(self) self.prefix = prefix def emit(self, record): correlator_internal.log(self.prefix+record.name, record.levelno, record.getMessage()) def __init__(self, init): self.__config = init[0] self.__logger = logging.getLogger(init[1]) self.__logger.setLevel(1) self.__logger.propagate = False self.__logger.addHandler(EPLPluginBase.__LogHandler('plugins.'))
[docs] def getLogger(self): """ Get a reference to the system logger. :return: A ``logging.Logger`` object which can be used to log to the correlator log file. """ return self.__logger
[docs] def getConfig(self): """ Get the plug-in instance configuration. :return: A dictionary with string keys as specified in the configuration file for this plug-in instance. """ return self.__config
@staticmethod def _setRootLogHandler(): logging.getLogger().addHandler(EPLPluginBase.__LogHandler('python.'))
[docs]def EPLAction(actionsig,actionname=None): """ EPLAction is the available decorator that can be set on a method to allow access to the method from EPL. For example:: @EPLAction("action<> returns string", actionname="eplaction") :param str actionsig: The EPL action signature used to access the decorated method. Uses the same syntax as declaring action variables, for example: action<integer> returns string. :param str actionname: The action name to use in EPL to access the decorated method. Optional, defaults to the name of the method in Python. """ def decorator(fn): if( actionname is not None ): fn.__apama_epl_plugin_action = actionname else: fn.__apama_epl_plugin_action = fn.__name__ fn.__apama_epl_plugin_sig = actionsig return fn return decorator
if 'sphinx' not in sys.modules: EPLPluginBase._setRootLogHandler() def __format_traceback(trace): def resolve_scope(module_name, code): # Resolve module. module = sys.modules.get(module_name, None) if not module: return '<hidden-module>' # Check module's functions. symbols = inspect.getmembers(module, inspect.isfunction) for symbol_name,symbol_info in symbols: if symbol_info.__code__ is code: scope = module_name return scope # Check module's classes. symbols = inspect.getmembers(module, inspect.isclass) for symbol_name,symbol_info in symbols: # Check class' methods. members = inspect.getmembers(symbol_info)#, inspect.ismethod) for method_name,method_info in members: if hasattr(method_info, '__code__'): if method_info.__code__ is code: scope = module_name + '.' + symbol_name return scope # Default to the thing's name. This usually happens # when resolving the stack frame for module-level code. return "<module>" tb=[] for frame in traceback.walk_tb(trace): scopename = resolve_scope(frame[0].f_globals['__name__'], frame[0].f_code) tb.append((scopename, frame[0].f_code.co_name, frame[0].f_code.co_filename, frame[0].f_lineno)) return tb