In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.
Generating Java from the Avro schema
I use the Avro schema “location.avsc” from my earlier post.
$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .
Which results in the Location.java for our project.
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package nl.rubix.avro; import org.apache.avro.specific.SpecificData; // ... and more stuff
Make sure we have the maven dependencies right in our pom.xml:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.1</version> </dependency> </dependencies>
We can now use the Location object in Java to build our binary Avro message
public ByteArrayOutputStream GenerateAvroStream() throws IOException { // Schema String schemaDescription = Location.getClassSchema().toString(); Schema s = Schema.parse(schemaDescription); System.out.println("Schema parsed: " + s); // Encode the data using JSON schema and embed the schema as metadata along with the data. ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer); dataFileWriter.create(s, outputStream); // Build AVRO message Location location = new Location(); location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001")); location.setTimestamp(System.currentTimeMillis() / 1000L); location.setLatitude(51.687402); location.setLongtitude(5.307759); System.out.println("Message location " + location.toString()); dataFileWriter.append(location); dataFileWriter.close(); System.out.println("Encode outputStream: " + outputStream); return outputStream; }
When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.
public void ProduceKafkaByte() { try { // Get the Apache AVRO message ByteArrayOutputStream data = GenerateAvroStream(); System.out.println("Here comes the data: " + data); // Start KAFKA publishing Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props); ProducerRecord<String, byte[]> producerRecord = null; producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray()); messageProducer.send(producerRecord); messageProducer.close(); } catch(IOException ex) { System.out.println ("Well this error happened: " + ex.toString()); } }
When we subscribe on our topic we can see the bytestream cruising by:
INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor) Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ# =######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) [2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)
All code available in github here.