Publishing Apache Avro messages on a Apache Kafka topic

In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.

screen-shot-2016-09-11-at-3-28-49-pm

Generating Java from the Avro schema

I use the  Avro schema “location.avsc” from my earlier post.

$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .

Which results in the Location.java for our project.

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package nl.rubix.avro;

import org.apache.avro.specific.SpecificData;
// ... and more stuff

Make sure we have the maven dependencies right in our pom.xml:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.1</version>
    </dependency>
  <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.8.1</version>
    </dependency>
</dependencies>

We can now use the Location object in Java to build our binary Avro message

public ByteArrayOutputStream GenerateAvroStream() throws IOException
{
    // Schema
    String schemaDescription = Location.getClassSchema().toString();
    Schema s = Schema.parse(schemaDescription);
    System.out.println("Schema parsed: " + s);

    // Encode the data using JSON schema and embed the schema as metadata along with the data.
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
    dataFileWriter.create(s, outputStream);

    // Build AVRO message
    Location location = new Location();
    location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001"));
    location.setTimestamp(System.currentTimeMillis() / 1000L);
    location.setLatitude(51.687402);
    location.setLongtitude(5.307759);
    System.out.println("Message location " + location.toString());

    dataFileWriter.append(location);
    dataFileWriter.close();
    System.out.println("Encode outputStream: " + outputStream);

    return outputStream;
}

When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.

public void ProduceKafkaByte()
{
    try
    {
        // Get the Apache AVRO message
        ByteArrayOutputStream data = GenerateAvroStream();
        System.out.println("Here comes the data: " + data);

        // Start KAFKA publishing
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
        ProducerRecord<String, byte[]> producerRecord = null;
        producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray());
        messageProducer.send(producerRecord);
        messageProducer.close();
    }
    catch(IOException ex)
    {
        System.out.println ("Well this error happened: " + ex.toString());
    }
}

When we subscribe on our topic we can see the bytestream cruising by:

INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor)
Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ#
=######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)

All code available in github here.
github