Adapter for Apache Kafka 9.6 | webMethods Adapter for Apache Kafka Documentation | webMethods Adapter for Apache Kafka Installation and User’s Documentation | Adapter for Apache Kafka Connections | Custom Serializer
 
Custom Serializer
 
How to integrate custom serializer/deserializer in Adapter for Apache Kafka
Kafka stores and transports byte arrays in its topics. If you wish to use the Java Objects instead of byte arrays, Kafka provides interfaces to accomplish it.
To implement a serializer, create a class that implements the org.apache.kafka.common.serialization.Serializer interface.
To implement a deserializer, create a class that implements the org.apache.kafka.common.serialization.Deserializer interface.
To implement serializer and deserializer, you need to implement the following methods:
*configure()
*close()
*serialize() / deserialize()
Serializing MyMessage in Producer
You create a serializer class that implements org.apache.kafka.common.serialization.Serializer.
serialize() receives your object and returns a serialized version as bytes array.
For example, Serializing MyMessage object in Producer is as follows:
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}

@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}

try {

(serialize your MyMessage object into bytes)

return bytes;

} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}

@Override
public void close()
{

}
}
Deserializing MyMessage in Consumer
You create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer
deserialize() receives serialized value as bytes array and returns your object.
For example, Deserializing MyMessage object in Consumer is as follows:
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}

@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}

try {

(deserialize value into your MyMessage object)

MyMessage message = new MyMessage();
return message;

} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}

@Override
public void close()
{

}
}
Note:
Like Kafka message value, Kafka message key also follows the same process in serializing and deserializing the objects.