Kafka - Creating custom serializers

Create Kafka serializers for JSON, Kryo and Smile.

Posted on 10 March 2016

I am currently working on a pet project involving Spark and Kafka and I noticed that there are currently very few examples of how to create your own serializers for the new 0.9.0.0 API. In this blog post I will show how to create serializers for my own custom class that (de)serializes from/to 4 different formats: strings, json, smile and kryo.

The code can be found at this repository.

Introduction

So what are serializers? Kafka stores and transports byte arrays in its queue. But in general you don’t want to work with byte arrays; you want to use the POJOs directly! Before version 0.9.0.0 the Kafka Java API used Encoders/Decoders instead but these have been replaced by (de)serializers in the new API.

The (de)serializer is responsible for translating between our POJO (SensorReading) and the byte array provided by Kafka. For every POJO you want to produce for / consume from a Kafka topic you have to supply your own (de)serializer.

Implementation

To implement a serializer you need to create a class that implements the org.apache.kafka.common.serialization.Serializer interface. For a deserializers it’s the same but we use the org.apache.kafka.common.serialization.Deserializer instead.

They each require you to implement 3 methods: configure (called at start up with the configuration properties), close (called when you close the Kafka session) and the serialize / deserialize method where the actual magic happens. You can implement each interface in separate classes like the standard serializers do but I found it more convenient to simply implement them both in my implementations.

StringReadingSerializer

So let’s take a look at the serialize() method of the most simple custom implementation; StringReadingSerializer:

public byte[] serialize(String s, SensorReading sensorReading) {
    String line = String.format(Locale.ROOT, "%s,%s,%s,%.4f",
      sensorReading.getTime(),
      sensorReading.getSensor().getId(),
      sensorReading.getSensor().getType().toString(),
      sensorReading.getValue());

    return line.getBytes(CHARSET);
}

It serializes the provided SensorReading pojo by simply concatenating the values in a string separated by comma’s. The deserialize() function is slightly longer and does the reverse:

public SensorReading deserialize(String topic, byte[] bytes) {
    try {
        String[] parts = new String(bytes, CHARSET).split(",");

        long time = Long.parseLong(parts[0]);
        Sensor.Type type = Sensor.Type.valueOf(parts[2]);
        double value = Double.parseDouble(parts[3]);

        return new SensorReading(new Sensor(parts[1], type), time, value);
    }
    catch(Exception e) {
        throw new IllegalArgumentException("Error reading bytes", e);
    }
}

That’s all there is to it when it comes to converting between data and byte arrays.

However; we obviously don’t want to write cumbersome string (de)serialization functions for our object trees. There’s plenty of other more convenient and less boilerplate-ish solutions.

KryoReadingSerializer

One of my favorite binary serialization libraries is Kryo. It’s fast, easy to use and gives you very compact results. Kryo can serialize a lot of types out of the box but for custom pojo’s you provide a simple Kryo encoder.

One sidenote: if you check out the source for KryoReadingSerializer you’ll notice that I keep the kryo instance in thread local storage. This is because a Kryo instance is not thread safe and rather costly to construct. So keep in mind that this is the recommended way to use Kryo in multithreaded environments!

So first let’s define the serializer Kryo uses internally (not to be confused by the Kafka serializer):

private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<SensorReading> {
    public void write(Kryo kryo, Output output, SensorReading sensorReading) {
        output.writeString(sensorReading.getSensor().getId());
        output.writeString(sensorReading.getSensor().getType().toString());
        output.writeLong(sensorReading.getTime(), true);
        output.writeDouble(sensorReading.getValue());
    }

    public SensorReading read(Kryo kryo, Input input, Class<SensorReading> aClass) {
        String id = input.readString();
        Sensor.Type type = Sensor.Type.valueOf(input.readString());

        return new SensorReading(
            new Sensor(id, type),
            input.readLong(true),
            input.readDouble());
    }
}

As you can see Kryo serializers handle both reading and writing. The write (serialize) call writes the values and the read reads back the same values in the same order.

We then tell Kryo to use this serializer to serialize our SensorReading objects:

Kryo kryo = new Kryo();
kryo.addDefaultSerializer(SensorReading.class, new KryoInternalSerializer());

So if we now take a look at the Kafka serialize/deserialize methods they are very straightforward:

public byte[] serialize(String s, SensorReading sensorReading) {
    ByteBufferOutput output = new ByteBufferOutput(100);
    kryos.get().writeObject(output, sensorReading);
    return output.toBytes();
}

public SensorReading deserialize(String s, byte[] bytes) {
    try {
        return kryos.get().readObject(new ByteBufferInput(bytes), SensorReading.class);
    }
    catch(Exception e) {
        throw new IllegalArgumentException("Error reading bytes", e);
    }
}

Since we told kryo that we want to use the provided serializer you can now easily (de)serialize from/to byte arrays.

JacksonReadingSerializer

The last serializer I want to demonstrate, the JacksonReadingSerializer actually supports two formats: JSON and Smile. It uses the excellent Jackson library to support these formats. Smile is basically binary JSON. It is not as small and fast as Kryo (Smile still stores key-value pairs) but it’s convenient because you can switch from JSON to Smile with just a single line of code:

ObjectMapper mapper = new ObjectMapper(new SmileFactory());
Note
Make sure you include the jackson-dataformat-smile dependency, it’s not part of the standard databind package!

Since my sensor and sensor reading POJO’s are immutable they don’t have a public parameterless constructor. What I then often do is simply provide a 'helper' class with just a few public fields that I use as a struct to hold the data. Another option would be to add @JsonProperty annotations to the constructor parameters to tell Jackson how to map the JSON to the constructor arguments. Of course this is only an option if you have control over that code.

So if we look at the serialize method you can see I first translate from SensorReading to SerializationHelper and then use the ObjectMapper to translate to a byte array:

public byte[] serialize(String s, SensorReading sensorReading) {
    try {
        return mapper.writeValueAsBytes(from(sensorReading));
    }
    catch(JsonProcessingException e) {
        throw new IllegalArgumentException(e);
    }
}
Note
You don’t need this translation step if your object is a basic bean: Jackson can serialize these without any configuration.

And our deserialize function is just as simple:

public SensorReading deserialize(String s, byte[] bytes) {
    try {
        return mapper.readValue(bytes, SerializationHelper.class).to();
    }
    catch(IOException e) {
        throw new IllegalArgumentException(e);
    }
}

The from / to between SerializationHelper and SensorReading is done in SerializationHelper’s from() and to() methods.

Configuring Kafka

The Example class can be started as either a producer (pushing random SensorReadings onto the queue) or a consumer. A Kafka consumer/producer is supplied with the correct classname of the (de)serializer in the supplied property file. So if we want to use the JacksonReadingSerializer in a producer:

properties.put("value.serializer", JacksonReadingSerializer.class.getName());

To configure a consumer we supply the same class as a value deserializer:

properties.put("value.deserializer", JacksonReadingSerializer.class.getName());

And since our JacksonReadingSerializer can be configured to use Smile we can also set a key we made up ourselves to true:

properties.put("value.serializer", JacksonReadingSerializer.class.getName());
properties.put("value.serializer.jackson.smile", "true");
Note
The configure method simply receives the entire Property instance you used to configure the Producer/Consumer.

Running the example

Make sure you have a Kafka instance running first! First build the example using mvn clean install. You can also run Example.java from your IDE.

To run the example as a producer:

java -jar target/kafka-serializer-1.0-SNAPSHOT-jar-with-dependencies.jar produce kryo test-topic

This will connect to kafka on the default port and start producing kryo serialized readings on the test-topic topic.

To start a consumer:

java -jar target/kafka-serializer-1.0-SNAPSHOT-jar-with-dependencies.jar consume kryo test-topic

This will also connect kafka and consume and print the readings.

More details on the different options that can be supplied can be found in the readme.

Conclusion

The new Kafka API is very clean and very easy to use. Creating serializers is simple; you just have to pick a format. I would suggest to go with whatever method you currently use as long at it can translate between your objects and byte arrays.

One downside of the new API is that the Spark Kafka connector does not support it yet. It depends on an older Kafka API that does not accept serializers. A way to temporarily circumvent this is to Base64 en/decode byte arrays to strings before pushing them into the queue and then manually call the serializers on them. This bit can then be removed when Spark supports the new API.

I hope you enjoyed the read. Let me know if anything is unclear!