In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.
Generating Java from the Avro schema
I use the Avro schema “location.avsc” from my earlier post.
$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .
Which results in the Location.java for our project.
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package nl.rubix.avro; import org.apache.avro.specific.SpecificData; // ... and more stuff
Make sure we have the maven dependencies right in our pom.xml:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>
We can now use the Location object in Java to build our binary Avro message
public ByteArrayOutputStream GenerateAvroStream() throws IOException
{
// Schema
String schemaDescription = Location.getClassSchema().toString();
Schema s = Schema.parse(schemaDescription);
System.out.println("Schema parsed: " + s);
// Encode the data using JSON schema and embed the schema as metadata along with the data.
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(s, outputStream);
// Build AVRO message
Location location = new Location();
location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001"));
location.setTimestamp(System.currentTimeMillis() / 1000L);
location.setLatitude(51.687402);
location.setLongtitude(5.307759);
System.out.println("Message location " + location.toString());
dataFileWriter.append(location);
dataFileWriter.close();
System.out.println("Encode outputStream: " + outputStream);
return outputStream;
}
When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.
public void ProduceKafkaByte()
{
try
{
// Get the Apache AVRO message
ByteArrayOutputStream data = GenerateAvroStream();
System.out.println("Here comes the data: " + data);
// Start KAFKA publishing
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
ProducerRecord<String, byte[]> producerRecord = null;
producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray());
messageProducer.send(producerRecord);
messageProducer.close();
}
catch(IOException ex)
{
System.out.println ("Well this error happened: " + ex.toString());
}
}
When we subscribe on our topic we can see the bytestream cruising by:
INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor)
Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ#
=######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)
All code available in github here.
![]()
