Getting started with Apache Kafka

Apache Kafka is a publish-subscribe messaging solution rethought as a distributed commit log.

kafka-logo-wide

The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.

Some use cases for Kafka are stream processing, event sourcing, metrics and all other (large sets of) data that go from publisher to 1-n subscriber(s). A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients making it a very efficient (and also easy to scale) high volume messaging solution.

So actually Kafka is a good alternative for any more traditional (JMS / MQ) message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications. And this all, is free.

Getting Started

The Kafka website has an excellent quickstart tutorial here. Download the latest version here and work through the tutorial to send and receive your first messages from console.

Playing around with Java

First we create a test topic.

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic testIteration
Created topic “testIteration”.

The earlier versions of Kafka came with default serializer but that created lot of confusion. With 0.8.2, you would need to pick a serializer yourself from StringSerializer or ByteArraySerializer that comes with API or build your own. Since both our key and value in the example will be a string, we use the StringSerializer.

Use the following Apache Kafka library as a Maven dependency (pom.xml).

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency></dependencies>

The following lines of code produces / publishes 10 messages on the Kafka Topic.


public void ProduceIteration()
{
int amountMessages = 10; // 10 is enough for the demo

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(props);

for(int i = 1; i <= amountMessages; i++)
{
ProducerRecord<String, String> data = new ProducerRecord<String, String>("testIteration", Integer.toString(i), Integer.<em>toString</em>(i));
System.out.println ("Publish message " + Integer.toString(i) + " - " + data);
producer.send(data);
}

producer.close();
}

The messages can be received from the topic:

jvzoggel$ bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic testIteration –property print.key=true –property print.timestamp=true

CreateTime:1474354960268        1       1
CreateTime:1474354960284        2       2
CreateTime:1474354960284        3       3
CreateTime:1474354960285        4       4
CreateTime:1474354960285        5       5
CreateTime:1474354960285        6       6
CreateTime:1474354960285        7       7
CreateTime:1474354960285        8       8
CreateTime:1474354960285        9       9
CreateTime:1474354960285        10      10