Configuring pipelining through the client API
Apama provides a C++ Client Software Development Kit (SDK). This allows software written in C++ to interface with a running event correlator or group of event correlators. Apama management tools such as engine_connect are written using this Client Software Development Kit.
The functionality of the Client SDK is found in the lib folder of the Apama installation and consists of the libraries libengine_client.so.5.2 (on UNIX), or engine_client5.2.lib (on Windows). To code against the library use the definitions from the engine_client_cpp.hpp header file in the include folder.
Detailed information on how to use the integration library is available in
The C Cient Software Development Kit (available if you selected
Developer during installation); this section looks at the specific methods that allow a developer to programmatically configure two event correlators to communicate in a pipelined arrangement.
The primary class contained in the library is
com::apama::engine::EngineManagement
An object instance of this class represents an event correlator within Apama, and allows a developer to:
inject EPL files,
delete EPL entities,
send events into a correlator,
get a correlator’s current operational status,
connect a receiver of events,
connect a correlator as a consumer of another correlator.
The last capability is of direct interest here, and is supported through the following methods on the EngineManagement class;
/**
* Connect this Event Correlator as an event receiver to another
* Event Correlator.
*
* @param target The Correlator to connect to
* @param channels An array of names representing the channels to subscribe
* to. This is a null-terminated array of pointers to zero-terminated
* char arrays. If this is null or empty, subscribe to all channels.
* @param disconnectSlow disconnect if slow. Only the first consumer's
* disconnectSlow value is used; subsequent consumers added to this.
* Default is false. EngineManagement object share the connection and
* thus the disconnect behavior.
* @param mode the connection mode to use,
* defaults to legacy (single connection, all
* events delivered to the default
* channel). Set to CONNECT_PARALLEL for
* connection per channel and channel values
* passed through.
* @return true if successful
* @exception EngineException
*/
virtual bool attachAsEventConsumerTo(
EngineManagement* target, const char* const* channels
bool disconnectSlow=false, ConnectMode mode = CONNECT_LEGACY) = 0;
/**
* Unsubscribe as an event receiver from another engine.
*
* @param target The Correlator to unsubscribe from.
* @param channels An array of names representing the channels to
* unsubscribe from. This is a null-terminated array of pointers to
* zero-terminated char arrays. If this is null or empty unsubscribe
* from all channels.
* @param mode the connection mode to use,
* defaults to legacy (single connection, all
* events delivered to the default
* channel). Set to CONNECT_PARALLEL for
* connection per channel and channel values
* passed through.
* @exception EngineException
*/
virtual void detachAsEventConsumerFrom(
EngineManagement* target, const char* const* channels,
ConnectMode mode = CONNECT_LEGACY) = 0;
The following sample code illustrates how to connect two correlators in a pipelined arrangement. The calls highlighted in bold are the important library calls.
// Method that connects to 2 ‘Engines’ (Correlators) and links them up
// in a pipeline
void connect(const char* host1, unsigned short port1,
const char* host2, unsigned short port2,
vector<const char*> channels, bool detach,
int& rc )
{
const char* emsg = "";
EngineManagement* engine1 = NULL;
EngineManagement* engine2 = NULL;
try {
// a bit verbose
cerr << "Requesting connection from host:" << host1
<< ", port:" << port1 << " to host:" << host2
<< ", port:" << port2 << endl;
// Ensure valid arguments
rc = 3; // this just sets a return value
if ((host1==host2) && (port1==port2)) {
cerr << "Connecting an engine to itself is disallowed"
<< endl;
return;
}
// Make sure the channel list is NULL-terminated
channels.push_back(NULL);
// Attempt to connect to source Correlator
rc = 1;
emsg = "Failed to connect to source engine";
if (!(engine1 =
com::apama::engine::connectToEngine(host1, port1))) {
throw EngineException(emsg);
}
// Attempt to connect to target Correlator
rc = 2;
emsg = "Failed to connect to target engine";
if (!(engine2 =
com::apama::engine::connectToEngine(host2, port2))) {
throw EngineException(emsg);
}
// Connect target to source
if (detach) {
emsg = "Detach failed";
engine2->detachAsEventConsumerFrom(engine1,&channels[0],CONNECT_LEGACY);
}
else {
emsg = "Attach failed";
if (
!engine2->attachAsEventConsumerTo(engine1,&channels[0],CONNECT_LEGACY))
{
rc = 4;
emsg = "Target engine could not connect to source engine";
throw EngineException(emsg);
}
}
}
catch (EngineException ex) {
string errmes;
errmes += emsg;
errmes += ": ";
errmes += ex.what();
throw EngineException(errmes.c_str());
}
// Shutdown cleanly
rc = 5;
if (engine1) {
// Disconnect from Apama
emsg = "Failed to disconnect from source engine";
com::apama::engine::disconnectFromEngine(engine1);
}
if (engine2) {
// Disconnect from Apama
emsg = "Failed to disconnect from target engine";
com::apama::engine::disconnectFromEngine(engine2);
}
}