JHipster – Streaming beer with Kafka and Spring Cloud Stream

Now that our OrderService is up and running, it’s time to make it a little more robust and decoupled. In this blog post we’re gonna put Kafka in between the OrderResource controller and our Spring Boot back-end system and use Spring Cloud Stream to ease development:

Blank Diagram

Upon creation of a JHipster application you will be given an option to select the Asynchronous messages using Apache Kafka option. After generation your pom file and application.yml will be all set up for using Kafka and Spring Cloud Stream. You’ll also get a docker file to spin up Kafka (and Zookeeper) and a MessageConfiguration class will be generated. There you need to declare your input and output channels (channels are Spring Cloud Stream abstractions, they’re the connection between the application and the message broker). If you follow the JHipster documentation on Kafka here – right after generating a virgin JHipster app – you should get a working flow up in no time.

Now, I wanna further improve upon the current HelloBeerTM application we finished in my previous blog post, and I didn’t check the Asynchronous messages option when I initially created the application. It’s not possible to add the option afterwards via the CLI, but luckily it’s not really that hard to add the necessary components manually. So let’s get started and makes those beer orders flow through a Kafka topic straight into our back-end application.
As always the finished code can be found on GitHub.

Kafka Docker image

Alright this guy I just ripped from a new JHipster app with the messaging option enabled. Add this kafka.yml file to the src/main/docker directory:

version: '2'
services:
    zookeeper:
        image: wurstmeister/zookeeper:3.4.6
        ports:
          - 2181:2181
    kafka:
        image: wurstmeister/kafka:1.0.0
        environment:
            KAFKA_ADVERTISED_HOST_NAME: localhost
            KAFKA_ADVERTISED_PORT: 9092
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_CREATE_TOPICS: "topic-jhipster:1:1"
        ports:
            - 9092:9092

You can spin up Kafka now with this file by issuing the following command:

docker-compose -f src/main/docker/kafka.yml up -d

Adding the dependencies

The following dependencies are needed to enable Spring Cloud Stream and have it integrate with Kafka:

<!-- Kafka support -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

Configuring the channels

Since we only need one Kafka Topic, we can use the default channels that Spring Cloud Stream has to offer. We need one input and one output channel, so we can use the combined Processor interface. For a more complex setup with multiple topics, you can write your own custom interfaces for the channels (this is also the practice in the JHipster documentation example). For more information about channels check the Spring Cloud Stream Reference Guide.

MessagingConfiguration

First add the configuration for the Processor channel. This is done in the
MessagingConfiguration class. We’ll add this guy to the config package, the place where JHipster stores all Spring Boot configuration.

package nl.whitehorses.hellobeer.config;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;

@EnableBinding(value = Processor.class)
public class MessagingConfiguration {
}

application-dev.yml

The overall configuration needs a few properties to let the application know where to find Kafka and to let Spring Cloud Stream channels bind to a Kafka topic. Let’s call the topic hb-orders. I’ll only put the configuration in the development configuration – application-dev.yml – for now:

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
            bindings:
                output:
                    destination: hb-orders
                    content-type: application/json
                input:
                    destination: hb-orders

Note that input and output are the default channel names when working with the default channel interfaces.
That’s it for the channel configuration. Now we can use them in our back-end code.

OrderResource – Publishing to Kafka

Let’s alter our OrderResource so it publishes the OrderDTO object to the output channel instead of calling the OrderService directly:

@RestController
@RequestMapping("/api/order")
public class OrderResource {

  private static final String ENTITY_NAME = "order";
  private final Logger log = LoggerFactory.getLogger(OrderResource.class);
  private MessageChannel channel;

  public OrderResource(final Processor processor) {
    this.channel = processor.output();
  }

  @PostMapping("/process-order")
  @Timed
  public ResponseEntity processOrder(@Valid @RequestBody OrderDTO order) {
    log.debug("REST request to process Order : {}", order);
    if (order.getOrderId() == null) {
      throw new BadRequestAlertException("Error processing order", ENTITY_NAME, "orderfailure");
    }
    channel.send(MessageBuilder.withPayload(order).build());

    return ResponseEntity.ok(order);
  }
}

Not much going on here. Just inject the Processor and its channel and send the OrderDTO object through it.

OrderService – Subscribing to Kafka

@Service
@Transactional
public class OrderService {
  ....
  @StreamListener(Processor.INPUT)
  public void registerOrder(OrderDTO order) throws InvalidOrderException {
    ....
  }
  ....
}

Even simpler. The only change is adding the StreamListener annotation to the registerOrder method, making sure that guy sets off every time an order arrives at the topic.

Testing code

The spring-cloud-stream-test-support dependency (test-scoped) enables testing without a connected messaging system. Messages published to topics can be inspected via the MessageCollector class. I’ve rewritten the OrderResourceTest class to check if the OrderDTO is published to the message channel when calling the OrderResource:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = HelloBeerApp.class)
public class OrderResourceTest {

  @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
  @Autowired
  private Processor processor;

  @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
  @Autowired
  private MessageCollector messageCollector;

  private MockMvc restMockMvc;

  @Before
  public void setUp() {
    MockitoAnnotations.initMocks(this);

    OrderResource orderResource = new OrderResource(processor);
    restMockMvc = MockMvcBuilders
      .standaloneSetup(orderResource)
      .build();
  }

  @Test
  public void testProcessOrder() throws Exception {
    OrderItemDTO orderItem1 = new OrderItemDTO(1L, 50L);
    OrderItemDTO orderItem2 = new OrderItemDTO(2L, 50L);
    OrderDTO order = new OrderDTO();
    order.setCustomerId(1L);
    order.setOrderId(1L);
    order.setOrderItems(Arrays.asList(orderItem1, orderItem2));

    restMockMvc.perform(
      post("/api/order/process-order")
        .contentType(TestUtil.APPLICATION_JSON_UTF8)
        .content(TestUtil.convertObjectToJsonBytes(order)))
        .andExpect(status().isOk());

    Message<?> received = messageCollector.forChannel(processor.output()).poll();
    assertNotNull(received);
    assertEquals(received.getPayload(), order);

  }

}

In the OrderServiceIntTest I changed one of the test methods so it publishes an OrderDTO message on the (test) channel where the OrderService is subscribed to:

@Test
@Transactional
public void assertOrderOK() throws InvalidOrderException {
  ....
  //orderService.registerOrder(order);
  Message<OrderDTO> message = new GenericMessage<OrderDTO>(order);
  processor.input().send(message);
  ....
}

More information about Spring Cloud Stream testing can be found here.

Wiring it all up

Now let’s see if our beers will flow. So here are our stock levels before:
Screenshot-2018-5-23 Item Stock Levels

Now post a new (valid) order with Postman:
Screenshot from 2018-05-23 20-39-00

And behold our new stock levels:
Screenshot-2018-5-23 Item Stock Levels(1)

It still works! So our new setup with a Kafka topic in the middle is working like a charm! Note that this is a very simplistic example. To make it more robust – for one what about failed orders?! – the first step would be to move the topic consumer code away from the OrderService and put it in a separate class. That consumer class can delegate processing to an injected OrderService and deal with possible errors, eg. by moving the order to another topic. And with another topic you need custom interfaces for your channels as well.

Summary

In this blog post we introduced a Kafka topic to separate our Order clients from our Order processing. With the help of Spring Cloud Stream this is very easy to do. We also looked at a few ways to test messaging with Spring Cloud Stream.
The plan was to say goodbye to JHipster for now, but maybe I’ll do one more blog post. I wanna find out how easy it is to switch from Kafka to RabbitMQ or maybe improve upon this version and introduce a failed-order topic. I also wanna test how easy it is to upgrade this JHipster app to the latest version. So much ideas, so little time! Anyhow, let’s grab a beer first and think about that next blog!

References