In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.
Generating Java from the Avro schema
I use the Avro schema “location.avsc” from my earlier post.
$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .<span class="copy">Copy</span> |
Which results in the Location.java for our project.
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package nl.rubix.avro; import org.apache.avro.specific.SpecificData; // ... and more stuff <span class = "copy" >Copy</span> |
Make sure we have the maven dependencies right in our pom.xml:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.1</version> </dependency> </dependencies> <span class="copy">Copy</span> |
We can now use the Location object in Java to build our binary Avro message
public ByteArrayOutputStream GenerateAvroStream() throws IOException { // Schema String schemaDescription = Location.getClassSchema().toString(); Schema s = Schema.parse(schemaDescription); System.out.println( "Schema parsed: " + s); // Encode the data using JSON schema and embed the schema as metadata along with the data. ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer); dataFileWriter.create(s, outputStream); // Build AVRO message Location location = new Location(); location.setVehicleId( new org.apache.avro.util.Utf8( "VHC-001" )); location.setTimestamp(System.currentTimeMillis() / 1000L); location.setLatitude( 51.687402 ); location.setLongtitude( 5.307759 ); System.out.println( "Message location " + location.toString()); dataFileWriter.append(location); dataFileWriter.close(); System.out.println( "Encode outputStream: " + outputStream); return outputStream; } <span class = "copy" >Copy</span> |
When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.
public void ProduceKafkaByte() { try { // Get the Apache AVRO message ByteArrayOutputStream data = GenerateAvroStream(); System.out.println( "Here comes the data: " + data); // Start KAFKA publishing Properties props = new Properties(); props.put( "bootstrap.servers" , "localhost:9092" ); props.put( "serializer.class" , "kafka.serializer.StringEncoder" ); props.put( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put( "value.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer" ); KafkaProducer<String, byte []> messageProducer = new KafkaProducer<String, byte []>(props); ProducerRecord<String, byte []> producerRecord = null ; producerRecord = new ProducerRecord<String, byte []>( "test" , "1" ,data.toByteArray()); messageProducer.send(producerRecord); messageProducer.close(); } catch (IOException ex) { System.out.println ( "Well this error happened: " + ex.toString()); } } <span class = "copy" >Copy</span> |
When we subscribe on our topic we can see the bytestream cruising by:
INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor) Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ# =######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) [2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)<span class="copy">Copy</span> |
All code available in github here.