Universal Messaging Concepts : Architecture : Universal Messaging Brokerless API
Universal Messaging Brokerless API
Universal Messaging offers, in addition to its standard full-featured client-server API, an extremely lightweight client-client communication API known as the Brokerless API.
Broker-based Model
Historically, messaging architecture has predominantly been based on a 'broker in the middle' approach. This is often referred to as 'hub and spoke'. The broker acts as the communications hub, routing messages between logically decoupled peers:
The pub-sub model is a common paradigm for broker based architecture, where one or more publishers send messages to the broker, which then distributes the messages to interested consumers.
Brokerless Model
The brokerless model is a peer to peer model that allows peers to be aware of how to communicate directly with one another rather than through a broker. In effect, each publisher peer acts like a server, and each consumer can communicate directly with the publishers:
While this model bypasses broker messaging functionality such as persistence or transactional semantics, it results in a considerably lower latency delivery of information from a publisher to a consumer. By halving of the number of "hops" between client and publisher, latency too is effectively halved. This is especially useful when ultra low latency message delivery is paramount (in, for example, the links between pricing, quant and risk engines in FX trading platforms).
The Brokerless API is currently only available for Java clients. It is located in the com.softwareag.um.io package.
The API is very simple, allowing each client to accept connections from other clients, and to receive arbitrary binary data from these clients synchronously or asynchronously. In many ways the API is similar to a standard TCP socket API, but offers the additional benefit of being able to use not just TCP sockets as a communication transport, but any of the following Universal Messaging communication technologies:
*TCP Sockets: data is transmitted directly over TCP Sockets
*SSL: data is SSL encrypted then transmitted over TCP Sockets
*SHM: data is transmitted via Shared Memory (for near-instant access by processes on the same machine)
*RDMA: data is transmitted via Remote Direct Memory Access (for access by processes on a remote machine; requires network adapters that support RDMA)
Let's take a quick look at how to use this API. Here is an example "echo" client and server; the EchoClient will write a string to the EchoServer; the EchoServer will respond to the EchoClient.
Here's the EchoClient:
package com.softwareag.um.io.samples.echo;
import com.softwareag.um.io.ClientContextBuilderFactory;
import com.softwareag.um.io.ClientTransportContext;
import com.softwareag.um.io.SynchronousTransport;
import com.softwareag.um.io.TransportFactory;
import com.softwareag.um.io.samples.SimpleMessage;
import com.softwareag.um.io.samples.SynchronousClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* This sample app simply writes a string entered into the console to an EchoServer
* The EchoServer will respond and this response will be output on the console.
*/
public class EchoClient {
public EchoClient(String url) throws IOException {
//Use the factory to generate the required builder based on the protocol in the url string
ClientTransportContext context = ClientContextBuilderFactory.getBuilder(url).build();
//We do not pass any handlers to the connect method because we want a synchronous transport
SynchronousTransport transport = TransportFactory.connect(context);
//This is just a basic wrapper for the client transport so it is easier to read/write messages
SynchronousClient<SimpleMessage> client = new SynchronousClient<>(transport);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
//Start a new thread to read from the client transport because read is a blocking call
new ReadThread(client);
//Now continue to write messages to the EchoServer until the user enter 'quit'
while(true){
System.out.println("Enter a message or type 'quit' to exit >");
String line = br.readLine();
if(line.equalsIgnoreCase("quit")){
break;
}
else{
client.write(new SimpleMessage(line));
}
}
}
private static class ReadThread extends Thread{
SynchronousClient<SimpleMessage> client;
public ReadThread(SynchronousClient<SimpleMessage> _client){
client = _client;
start();
}
@Override
public void run(){
try{
while(true){
SimpleMessage mess = client.read(new SimpleMessage());
System.out.println(mess.toString());
}
}
catch(Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
if(args.length == 0){
usage();
System.exit(1);
}
new EchoClient(args[0]);
}
public static void usage(){
System.out.println("EchoClient <URL>");
System.out.println("<Required parameters>");
System.out.println("\tURL - protocol://host:port for the server to connect to e.g. "+TransportFactory.SOCKET+"://localhost:9000");
}
}
And, the EchoServer:
package com.softwareag.um.io.samples.echo;
import com.softwareag.um.io.ServerContextBuilderFactory;
import com.softwareag.um.io.ServerTransportContext;
import com.softwareag.um.io.SynchronousServerTransport;
import com.softwareag.um.io.SynchronousTransport;
import com.softwareag.um.io.TransportFactory;
import com.softwareag.um.io.samples.SimpleMessage;
import com.softwareag.um.io.samples.SynchronousClient;
import java.io.IOException;
/**
* This sample will only handle one client connection at a time. When a client connects,
* the EchoServer will immediately respond to any messages with exactly the same message.
*/
public class EchoServer implements Runnable{
private volatile SynchronousClient<SimpleMessage> client;
private final SynchronousServerTransport transport;
private volatile boolean stopped = false;
public EchoServer(String url) throws IOException {
//The factory will create the correct context based on the protocol in the url
ServerTransportContext context = ServerContextBuilderFactory.getBuilder(url).build();
//Because we have not passed an AcceptHandler into the bind method, we are returned
//a SynchronousServerTransport. This means we have to call accept on the transport
//to accept new client transports.
transport = TransportFactory.bind(context);
}
public static void main(String[] args) throws IOException {
if(args.length == 0){
usage();
System.exit(1);
}
EchoServer echoServer = new EchoServer(args[0]);
Thread t = new Thread(echoServer);
t.start();
System.out.println("Press enter to quit.");
System.in.read();
echoServer.close();
}
public static void usage(){
System.out.println("EchoServer <URL>");
System.out.println("<Required parameters>");
System.out.println("\tURL - protocol://host:port to bind the server transport to e.g. "+TransportFactory.SOCKET+"://localhost:9000");
}
protected void close(){
stopped = true;
client.close();
transport.close();
}
@Override
public void run() {
try{
while(true){
System.out.println("Waiting for client");
//accept() will block until a client makes a connection to our server
SynchronousTransport clientTransport = transport.accept();
System.out.println("Client connected. Echo service started.");
//The SyncronousClient is simply a wrapper to make reading/writing easier
client = new SynchronousClient<>(clientTransport);
try{
while(!stopped){
client.write(client.read(new SimpleMessage()));
}
}
catch (IOException e){
System.out.println("Connection closed");
}
}
}
catch(IOException e){
e.printStackTrace();
}
}
}
Copyright © 2013-2014 Software AG, Darmstadt, Germany.

Product LogoContact Support   |   Community   |   Feedback