Publishing Protobuf Events to a Channel
The Universal Messaging Java client API supports publishing protobuf events to a store. For more information about using Google Protocol Buffers with Universal Messaging, see
Google Protocol Buffers.
You can use the following example to publish protobuf events to a Universal Messaging channel. The example contains the following code samples:
A sample that publishes protobuf events to a channel, in which the event is filled with certain data that is then consumed and parsed on the subscriber side. The sample uses a predefined file named umTeam.proto.
The contents of the predefined file umTeam.proto that you must compile using the following command:
protoc --descriptor_set_out=<path_to_file>/umTeam.descriptor <path_to_file>/umTeam.proto
After you generate the umTeam.descriptor file, you must set it as a protobuf descriptor on the channel used in the sample below. You can do this while creating or editing the channel.
Sample that Publishes Protobuf Events
public class ProtobufSample {
public static final String TEAM1="Team1";
public static final String TEAMNAME1="INDIA";
public static final String TEAM2="Team2";
public static final String TEAMNAME2="USA";
public static final String TEAM3="Team3";
public static final String TEAMNAME3="BULGARIA";
public static final String TEAMS="Teams";
public static final String TEAMLEAD1="John";
public static final String TEAMLEAD2="Richard";
public static final String TEAMLEAD3="Frank";
public static final String TEAMLEADFIELD="TeamLead";
public static final String _EMAIL="_email";
public static final String URL="url";
public static final String PRODUCTNAME="productName";
public static final String UM="Universal Messaging";
public static final String UMMESSAGE="UniversalMessaging";
public static DynamicMessage.Builder buildTeamLead(String name, FileDescriptor desc,
DynamicMessage.Builder employees){
Descriptor teamLeadDescriptor = desc.findMessageTypeByName(TEAMLEADFIELD);
return DynamicMessage.newBuilder(teamLeadDescriptor)
.setField(teamLeadDescriptor.findFieldByName("teamLeadName"), name)
.setField(teamLeadDescriptor.findFieldByName("employees"), employees.build());
}
public static DynamicMessage.Builder buildTeam(String teamName, String teamFieldName, FileDescriptor desc,
DynamicMessage.Builder teamLeadBuilder, DynamicMessage.Builder emailBuilder){
Descriptor teamDescriptor = desc.findMessageTypeByName(teamFieldName);
return DynamicMessage.newBuilder(teamDescriptor)
.setField(teamDescriptor.findFieldByName("teamName"), teamName)
.setField(teamDescriptor.findFieldByName("TeamLead"), teamLeadBuilder.build())
.setField(teamDescriptor.findFieldByName("_email"), emailBuilder.build());
}
public static DynamicMessage.Builder buildUrl(String url, FileDescriptor desc){
Descriptor urlDescriptor = desc.findMessageTypeByName(URL);
return DynamicMessage.newBuilder(urlDescriptor)
.setField(urlDescriptor.findFieldByName(URL), url);
}
public static DynamicMessage.Builder buildEmail(DynamicMessage.Builder urlBuilder, FileDescriptor desc){
Descriptor emailDescriptor = desc.findMessageTypeByName(_EMAIL);
return DynamicMessage.newBuilder(emailDescriptor)
.setField(emailDescriptor.findFieldByName(URL), urlBuilder.build());
}
public static void main(String[] args) throws Exception {
String[] RNAME = {"nsp://localhost:11000"};
nSessionAttributes nsa = new nSessionAttributes(RNAME);
nSession session = nSessionFactory.create(nsa);
session.init();
String[] fileNames = {"/build/change-management/test/protobuf/umTeam.descriptor"};
FileDescriptorSet.Builder builder = DescriptorProtos.FileDescriptorSet.newBuilder();
List<ProtobufDescriptor> descriptors = new ArrayList<>();
for (String fileName : fileNames) {
byte[] bytes = fFile.readAllBytes(new File(System.getProperty("PROJECT_ROOT_FOLDER", "..") + fileName));
builder.mergeFrom(bytes);
String[] elements = fileName.split("/");
String descName = elements[elements.length - 1];
ProtobufDescriptor desc = new ProtobufDescriptor(descName, bytes);
descriptors.add(desc);
}
nChannelAttributes nca = new nChannelAttributes("channel");
nca.setProtobufDescriptors(descriptors);
nChannel chan = session.createChannel(nca);
FileDescriptor fileDescriptor = FileDescriptor.buildFrom(builder.getFile(0), new FileDescriptor[0]);
Descriptor messageEmployees = fileDescriptor.findMessageTypeByName("employees");
DynamicMessage.Builder employeesBuilder = DynamicMessage.newBuilder(messageEmployees).
setField(messageEmployees.findFieldByName("numberOfEmployees"), 5);
DynamicMessage.Builder builderTeamLead1 = ProtobufSample.buildTeamLead(TEAMLEAD1, fileDescriptor, employeesBuilder);
DynamicMessage.Builder builderURLTeam1 = ProtobufSample.buildUrl("teamIndia@softwareag.com", fileDescriptor);
DynamicMessage.Builder builderEmailTeam1 = ProtobufSample.buildEmail(builderURLTeam1, fileDescriptor);
DynamicMessage.Builder builderTeamLead2 = ProtobufSample.buildTeamLead(TEAMLEAD2, fileDescriptor, employeesBuilder);
DynamicMessage.Builder builderURLTeam2 = ProtobufSample.buildUrl("teamUSA@softwareag.com", fileDescriptor);
DynamicMessage.Builder builderEmailTeam2 = ProtobufSample.buildEmail(builderURLTeam2, fileDescriptor);
DynamicMessage.Builder builderTeamLead3 = ProtobufSample.buildTeamLead(TEAMLEAD3, fileDescriptor, employeesBuilder);
DynamicMessage.Builder builderURLTeam3 = ProtobufSample.buildUrl("teamBulgaria@softwareag.com", fileDescriptor);
DynamicMessage.Builder builderEmailTeam3 = ProtobufSample.buildEmail(builderURLTeam3, fileDescriptor);
DynamicMessage.Builder builderTeam1 = ProtobufSample.buildTeam(TEAMNAME1,TEAM1,fileDescriptor,builderTeamLead1,
builderEmailTeam1);
DynamicMessage.Builder builderTeam2 = ProtobufSample.buildTeam(TEAMNAME2,TEAM2,fileDescriptor,builderTeamLead2,
builderEmailTeam2);
DynamicMessage.Builder builderTeam3 = ProtobufSample.buildTeam(TEAMNAME3,TEAM3,fileDescriptor,builderTeamLead3,
builderEmailTeam3);
Descriptor teamsDescriptor = fileDescriptor.findMessageTypeByName(TEAMS);
DynamicMessage.Builder teamsBuilder = DynamicMessage.newBuilder(teamsDescriptor)
.setField(teamsDescriptor.findFieldByName(TEAM1), builderTeam1.build())
.setField(teamsDescriptor.findFieldByName(TEAM2), builderTeam2.build())
.setField(teamsDescriptor.findFieldByName(TEAM3), builderTeam3.build());
DynamicMessage.Builder builderURLUM = ProtobufSample.buildUrl("UniversalMessaging@softwareag.com", fileDescriptor);
DynamicMessage.Builder builderEmailUM = ProtobufSample.buildEmail(builderURLUM, fileDescriptor);
Descriptor UMDescriptor = fileDescriptor.findMessageTypeByName(UMMESSAGE);
DynamicMessage.Builder UMMessageBuilder = DynamicMessage.newBuilder(UMDescriptor)
.setField(UMDescriptor.findFieldByName(TEAMS), teamsBuilder.build())
.setField(UMDescriptor.findFieldByName(PRODUCTNAME), UM)
.setField(UMDescriptor.findFieldByName(_EMAIL), builderEmailUM.build());
DynamicMessage dynamicMessage = UMMessageBuilder.build();
nEventListener nel = new nEventListener() {
@Override
public void go(nConsumeEvent evt) {
if (evt.getEventID() != -2) {
try {
DynamicMessage eventReceived = DynamicMessage.parseFrom(UMDescriptor, evt.getEventData());
System.out.println("<<<<<<<<<<<<<<<"+ UMDescriptor.getName() + ">>>>>>>>>>>>>>>");
for (Entry<FieldDescriptor,Object> entry : eventReceived.getAllFields().entrySet()){
System.out.println("Name of the attribute: " + entry.getKey().getName() + "\n\n");
System.out.println(entry.getValue().toString());
System.out.println("============================");
}
evt.ack();
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
chan.addSubscriber(nel);
nProtobufEvent pbe = new nProtobufEvent(dynamicMessage.toByteArray(),
UMMessageBuilder.getDescriptorForType().getName());
chan.publish(pbe);
}
}
umTeam.proto File
message UniversalMessaging {
optional Teams Teams = 1;
optional string productName = 2;
optional _email _email = 3;
}
message _email {
optional url url = 1;
}
message url {
optional string url = 1;
}
message Teams {
optional Team1 Team1 = 1;
optional Team2 Team2 = 2;
optional Team3 Team3 = 3;
}
message Team1 {
optional string teamName = 1;
optional TeamLead TeamLead = 2;
optional _email _email = 3;
}
message Team2 {
optional string teamName = 1;
optional TeamLead TeamLead = 2;
optional _email _email = 3;
}
message Team3 {
optional string teamName = 1;
optional TeamLead TeamLead = 2;
optional _email _email = 3;
}
message TeamLead {
optional string teamLeadName = 1;
optional employees employees = 2;
}
message employees {
optional int32 numberOfEmployees = 1;
}