The Java example
The Java example is effectively identical to the C++ example described in the previous topic. The source file for this example is available as JavaExample.java and is available in the samples\engine_client\java folder of the Apama installation.
Building and running the example is easy. Full instructions are available in the README.txt contained in the same folder.
//*
/*
* JavaExample.java
*
* This simple example illustrates how to use the Java SDK to
* interface with Apama. The example connects to a remote Event Correlator
* (also known as a Correlation Engine or just the Engine), creates
* an event consumer and connects it to the Engine's supplier interface,
* creates some MonitorScript code and injects it, injects some sample events,
* and receives some back from the Engine when the monitor triggers. It then
* disconnects from the Engine and exits.
*
* $Copyright(c) 2013 Software AG, Darmstadt, Germany and/or its licensors$
*
* $RCSfile$ $Revision$ $Date$
*/
import com.apama.engine.EngineManagementFactory;
import com.apama.engine.EngineManagement;
import com.apama.engine.EngineStatus;
import com.apama.engine.MonitorScript;
import com.apama.event.EventConsumer;
import com.apama.event.EventSupplier;
import com.apama.event.Event;
import com.apama.EngineException;
/**
* Event receiver implementation.
*/
class ReceiveConsumer implements EventConsumer {
/** Per-connection resource handle returned by the Engine */
private EventSupplier supplier;
/**
* Constructor creates the connection to the Engine.
*
* @param engine The Event Correlator to connect to.
*
* @exception EngineException
*/
public ReceiveConsumer(EngineManagement engine) throws EngineException {
// Make the connection. The returned EventSupplier is a handle to
// private resources allocated by the engine to deal with this
// connection.
supplier = engine.connectEventConsumer(this, new String[] {});
}
/**
* Receive events from the Engine and log them to stdout.
*
* @param events The received events.
*
* @exception EngineException
*/
public void sendEvents(Event[] events) throws EngineException {
if (events!=null) {
for (int i=0; i<events.length; i++) {
System.out.println(events[i]);
}
}
}
/** No destructors in Java, so we need to explicitly deregister the consumer. */
public void deregister() throws EngineException {
if (supplier!=null)
supplier.disconnect();
}
} // non-public class ReceiveConsumer
/**
* Main program.
*
* Return codes:
* 0 = Everything OK
* 1 = Couldn't connect to Engine
* 2 = Something else went wrong
*/
public class JavaExample {
public static void main(String[] argv) {
// Return code
int rc = 2;
// Error message to display if anything goes wrong. Update this
// appropriately before each operation that might break.
String emsg = null;
if ((argv.length == 2) && (Integer.parseInt(argv[1]) > 0)) {
// The engine
EngineManagement engine = null;
try {
try {
rc = 1;
// Attempt to connect to a remote Engine
emsg = "Failed to connect to engine";
engine = EngineManagementFactory.connectToEngine(argv[0],
Integer.parseInt(argv[1]));
// Create an event consumer
emsg = "Event sink connection failed";
ReceiveConsumer consumer = new ReceiveConsumer(engine);
// Inject some MonitorScript (don't forget to delete it when done)
emsg = "MonitorScript injection failed";
MonitorScript script = new MonitorScript(
"event TestEvent {" +"\n" +
"string text;" +"\n" +
"}" +"\n" +
"" +"\n" +
"monitor Echo {" +"\n" +
"" +"\n" +
"TestEvent test;" +"\n" +
"" +"\n" +
"action onload {" +"\n" +
"on all TestEvent(*):test {" +"\n" +
"emit TestEvent(test.text);" +"\n" +
"}" +"\n" +
"}" +"\n" +
"}");
engine.injectMonitorScript(script);
// Wait a few seconds to be sure the injection event has been processed
Thread.sleep(3000);
// Get & display status (have to delete it when done)
emsg = "Status gathering failed";
EngineStatus status = engine.getStatus();
System.out.println(status);
// Send some events (In the Java API we do NOT need to null
// terminate the array)
emsg = "Event sending failed";
Event[] events = new Event[2];
events[0] = new Event("TestEvent(\"Hello, World\")");
events[1] = new Event("TestEvent(\"Welcome to Apama\")");
engine.sendEvents(events);
// Delete the event type and monitor we added
emsg = "Name deletion failed";
engine.deleteName("Echo");
engine.deleteName("TestEvent");
// Wait a few seconds for the output event to be received and
// the deletions processed
Thread.sleep(3000);
// Display status again
emsg = "Status gathering failed";
System.out.println();
status = engine.getStatus();
System.out.println(status);
// Disconnect and destroy the event consumer
emsg = "Event sink disconnection failed";
consumer.deregister();
// If we got this far, everything succeeded!
rc = 0;
}
catch (EngineException ex) {
// Rethrow so exception printer can deal with it
throw ex;
}
catch (Throwable t) {
throw new EngineException("Caught non-engine exception in main()");
}
}
catch (EngineException ex) {
System.err.println(emsg + ": " + ex);
}
}
else {
// Bad command line given
System.out.println("Usage: java JavaExample <host> <port>");
}
// Done!
System.exit(rc);
} // main()
} // class JavaExample
Copyright © 2013
Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or Terracotta Inc., San Francisco, CA, USA, and/or Software AG (Canada) Inc., Cambridge, Ontario, Canada, and/or, Software AG (UK) Ltd., Derby, United Kingdom, and/or Software A.G. (Israel) Ltd., Or-Yehuda, Israel and/or their licensors.