Publishing Apache Avro messages on a Apache Kafka topic

In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.


Generating Java from the Avro schema

I use the  Avro schema “location.avsc” from my earlier post.

$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .

Which results in the for our project.

Make sure we have the maven dependencies right in our pom.xml:

We can now use the Location object in Java to build our binary Avro message

public ByteArrayOutputStream GenerateAvroStream() throws IOException
    // Schema
    String schemaDescription = Location.getClassSchema().toString();
    Schema s = Schema.parse(schemaDescription);
    System.out.println("Schema parsed: " + s);
    // Encode the data using JSON schema and embed the schema as metadata along with the data.
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
    dataFileWriter.create(s, outputStream);
    // Build AVRO message
    Location location = new Location();
    location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001"));
    location.setTimestamp(System.currentTimeMillis() / 1000L);
    System.out.println("Message location " + location.toString());
    System.out.println("Encode outputStream: " + outputStream);
    return outputStream;
When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.

public void ProduceKafkaByte()
        // Get the Apache AVRO message
        ByteArrayOutputStream data = GenerateAvroStream();
        System.out.println("Here comes the data: " + data);
        // Start KAFKA publishing
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
        ProducerRecord<String, byte[]> producerRecord = null;
        producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray());
    catch(IOException ex)
        System.out.println ("Well this error happened: " + ex.toString());
When we subscribe on our topic we can see the bytestream cruising by:

All code available in github here.