Apama Documentation : Connecting Apama Applications to External Components : Working with Connectivity Plug-ins : Developing Connectivity Plug-ins : Requirements of a transport chain manager plug-in class
Requirements of a transport chain manager plug-in class
A transport plug-in can control the lifetime of chains involving that transport, by providing a dynamic chain manager. The chain manager can decide when to create or destroy chains, and is typically controlled by either listening to channel subscriptions from the correlator host, or by listening to external connections.
For example, any topic or queue on a message bus can be exposed dynamically without having to provide a list of the topics/queues to connect to. On a channel-created notification, the chain manager would check if there is a topic/queue to which it can connect, and create a chain instance to connect to that topic/queue on demand.
Alternatively, the chain manager may listen to accept new connections, and each new connection can create a new chain instance. For example, new incoming connections could each create a new chain instance, with the chain manager holding a server socket, and on accepting connections creating a suitable chain instance to handle messages on that connection. In both cases, the chain manager will typically hold some connection object, which it then needs to pass to transport instances when they are created. Thus, the chain manager and transport are usually tightly coupled, and a chain manager can only create chains using its own transport class.
A transport that uses a dynamic chain manager to create its instances consists of a subclass of AbstractTransport (or AbstractSimpleTransport), and a subclass of AbstractChainManager which is the class that must be specified in the configuration file's connectivityPlugins section. See Configuration file for connectivity plug-ins and Requirements of a plug-in class.
The chain manager is responsible for:
*Creating and destroying chains as needed, often in response to notifications about the channels that the EPL application is using or to handle new connections initiated from another process. Some managers will create a single chain for sending messages in both directions on a given channel (towards host and towards transport), others may create separate chains for each direction, or may only support one direction. For detailed information about this, see Creating dynamic chains from a chain manager plug-in. In summary, there are two main aspects to chain creation:
*Selecting which chain definition to use when creating a new chain instance, if there is more than one chain definition available for this transport. For more information, see Creating dynamic chains from a chain manager plug-in.
*Instantiating the transport plug-in during creation of the chain, by calling the transport's constructor. The chain manager can simply pass through the logger and params arguments to a transport constructor with the same signature as the createTransport method, or can pass additional information that the transport needs - such as a reference to the chain manager or connection, or information about the host channel(s) the chain is sending to/from.
*Instantiating and managing the lifetime of any connection to an external server or other resources that should be shared by all associated transports. Usually, it is undesirable for each transport or chain to have its own separate connection to any external server that the transport is using, as the number of chains may be large. In many protocols, connections are heavyweight entities that you would not want to have lots of. The chain manager can create its connections at any time, but it is recommended to create the initial connection in the chain manager's start() method if it is desirable for the correlator to delay coming up until the connection is established, and for the correlator to fail to start if an exception is thrown while making the initial connection. If not, it should happen on a background thread created by the start() method.
*Optionally, reporting status information that applies to the chain manager rather than to individual transports. For example, status about a connection shared across all transports could be reported by the chain manager, as could aggregated KPI statistics from all transport chains.
The transport class is responsible only for sending and/or receiving messages, often making use of a connection owned by the chain manager. Transports can also report their own status values if desired, though if it is likely there will be a large number of transports, individual status for each may be less useful and more expensive to report than aggregated status for the whole chain manager.
Every chain manager is required to implement the following:
*A public constructor that will be called during correlator startup for each configured dynamicChainManager, with the same signature as the AbstractChainManager constructor.
*createTransport
*start
*shutdown
The AbstractChainManager base class has a number of member fields that provide access to logging, the configuration for all dynamic chain definitions associated with its transport, and a ChainManagerHost interface which supports creating chains and registering channel listeners.
A typical chain manager would use its start() method to create any required connection(s) to external servers, and to add a ChannelLifecycleListener providing notifications when channels with a specific prefix are created or destroyed.
It is possible to listen for all channels regardless of prefix, but using a prefix to limit the subset of channels monitored by each chain manager is recommended to improve performance. The ChannelLifecycleListener will fire to indicate that a channel has been created when the channel name is used for the first time, typically as a result of the Apama application calling monitor.subscribe(channel) or send event to channel. When this happens, the manager must first decide whether it needs to have a chain for the specified channel, as some managers may only wish to take action if a channel with the specified name exists on the external system they are connected to. The manager must also check if it already has a chain for this channel in the specified direction, since in some situations the listener will notify about creation of the same channel more than once (see flushChannelCache in Shutting down and managing components). If the manager has established that a chain is needed for this channel and none already exists, it should create one before returning from the listener callback. Or if a chain already exists for this channel, but is no longer needed, it should destroy it. In other cases, it should do nothing.
The first EPL monitor.subscribe(channel) or send event to channel call to use a channel with a registered listener will block until the listener returns to ensure that no messages are missed if the manager does decide to create a chain for that channel. If an error occurs in the chain manager's implementation of the listener callback, it will be logged but no exception is thrown in the EPL application. See Creating dynamic chains from a chain manager plug-in for more details about how to create chains from a dynamic chain manager.
When the onChannelDestroyed method of the ChannelLifecycleListener is called to indicate that a channel has been destroyed (that is, implies that there are no remaining EPL monitors using the channel for the specified direction), the chain manager should call destroy on the chain to shut down and disconnect all associated transport and codec plug-ins. Chain managers should not implement reference counting, as the destroy notification will not be fired until all uses of the channel have finished.
Note that at present, channel destroy notifications are only sent for the TOWARDS_HOST direction (monitor.subscribe()) since in the TOWARDS_TRANSPORT direction (send event to channel) there is no unambiguous way of determining when a channel is no longer needed.
If using correlator persistence, the required channel lifecycle notifications for channels in use by any persistent monitors will be replayed to chain managers during recovery, so there is no need for chain managers to persist any state across restarts to support correct operation of persistence.
For detailed information about the classes and interfaces involved in creating a chain manager, including more detailed information about how to use the listener API correctly and safely, see the API Reference for Java (Javadoc) on the com.softwareag.connectivity.chainmanagers package. Note that in this release chain manager classes can be written in Java but not C++.
For a complete example of a working Java chain manager and transport, see the Kafka sample in the samples/connectivity_plugin/java/KafkaTransport directory of your Apama installation.
Copyright © 2013-2017 Software AG, Darmstadt, Germany. (Innovation Release)

Product LogoContact Support   |   Community   |   Feedback