Now that our OrderService is up and running, it’s time to make it a little more robust and decoupled. In this blog post we’re gonna put Kafka in between the OrderResource controller and our Spring Boot back-end system and use Spring Cloud Stream to ease development:
Upon creation of a JHipster application you will be given an option to select the Asynchronous messages using Apache Kafka option. After generation your pom file and application.yml will be all set up for using Kafka and Spring Cloud Stream. You’ll also get a docker file to spin up Kafka (and Zookeeper) and a MessageConfiguration class will be generated. There you need to declare your input and output channels (channels are Spring Cloud Stream abstractions, they’re the connection between the application and the message broker). If you follow the JHipster documentation on Kafka here – right after generating a virgin JHipster app – you should get a working flow up in no time.
Now, I wanna further improve upon the current HelloBeerTM application we finished in my previous blog post, and I didn’t check the Asynchronous messages option when I initially created the application. It’s not possible to add the option afterwards via the CLI, but luckily it’s not really that hard to add the necessary components manually. So let’s get started and makes those beer orders flow through a Kafka topic straight into our back-end application.
As always the finished code can be found on GitHub.
Kafka Docker image
Alright this guy I just ripped from a new JHipster app with the messaging option enabled. Add this kafka.yml file to the src/main/docker directory:
version: '2' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - 2181:2181 kafka: image: wurstmeister/kafka:1.0.0 environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ADVERTISED_PORT: 9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "topic-jhipster:1:1" ports: - 9092:9092
You can spin up Kafka now with this file by issuing the following command:
docker-compose -f src/main/docker/kafka.yml up -d
Adding the dependencies
The following dependencies are needed to enable Spring Cloud Stream and have it integrate with Kafka:
<!-- Kafka support --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency>
Configuring the channels
Since we only need one Kafka Topic, we can use the default channels that Spring Cloud Stream has to offer. We need one input and one output channel, so we can use the combined Processor interface. For a more complex setup with multiple topics, you can write your own custom interfaces for the channels (this is also the practice in the JHipster documentation example). For more information about channels check the Spring Cloud Stream Reference Guide.
MessagingConfiguration
First add the configuration for the Processor channel. This is done in the
MessagingConfiguration class. We’ll add this guy to the config package, the place where JHipster stores all Spring Boot configuration.
package nl.whitehorses.hellobeer.config; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; @EnableBinding(value = Processor.class) public class MessagingConfiguration { }
application-dev.yml
The overall configuration needs a few properties to let the application know where to find Kafka and to let Spring Cloud Stream channels bind to a Kafka topic. Let’s call the topic hb-orders. I’ll only put the configuration in the development configuration – application-dev.yml – for now:
spring: cloud: stream: kafka: binder: brokers: localhost zk-nodes: localhost bindings: output: destination: hb-orders content-type: application/json input: destination: hb-orders
Note that input and output are the default channel names when working with the default channel interfaces.
That’s it for the channel configuration. Now we can use them in our back-end code.
OrderResource – Publishing to Kafka
Let’s alter our OrderResource so it publishes the OrderDTO object to the output channel instead of calling the OrderService directly:
@RestController @RequestMapping("/api/order") public class OrderResource { private static final String ENTITY_NAME = "order"; private final Logger log = LoggerFactory.getLogger(OrderResource.class); private MessageChannel channel; public OrderResource(final Processor processor) { this.channel = processor.output(); } @PostMapping("/process-order") @Timed public ResponseEntity processOrder(@Valid @RequestBody OrderDTO order) { log.debug("REST request to process Order : {}", order); if (order.getOrderId() == null) { throw new BadRequestAlertException("Error processing order", ENTITY_NAME, "orderfailure"); } channel.send(MessageBuilder.withPayload(order).build()); return ResponseEntity.ok(order); } }
Not much going on here. Just inject the Processor and its channel and send the OrderDTO object through it.
OrderService – Subscribing to Kafka
@Service @Transactional public class OrderService { .... @StreamListener(Processor.INPUT) public void registerOrder(OrderDTO order) throws InvalidOrderException { .... } .... }
Even simpler. The only change is adding the StreamListener annotation to the registerOrder method, making sure that guy sets off every time an order arrives at the topic.
Testing code
The spring-cloud-stream-test-support dependency (test-scoped) enables testing without a connected messaging system. Messages published to topics can be inspected via the MessageCollector class. I’ve rewritten the OrderResourceTest class to check if the OrderDTO is published to the message channel when calling the OrderResource:
@RunWith(SpringRunner.class) @SpringBootTest(classes = HelloBeerApp.class) public class OrderResourceTest { @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Autowired private Processor processor; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Autowired private MessageCollector messageCollector; private MockMvc restMockMvc; @Before public void setUp() { MockitoAnnotations.initMocks(this); OrderResource orderResource = new OrderResource(processor); restMockMvc = MockMvcBuilders .standaloneSetup(orderResource) .build(); } @Test public void testProcessOrder() throws Exception { OrderItemDTO orderItem1 = new OrderItemDTO(1L, 50L); OrderItemDTO orderItem2 = new OrderItemDTO(2L, 50L); OrderDTO order = new OrderDTO(); order.setCustomerId(1L); order.setOrderId(1L); order.setOrderItems(Arrays.asList(orderItem1, orderItem2)); restMockMvc.perform( post("/api/order/process-order") .contentType(TestUtil.APPLICATION_JSON_UTF8) .content(TestUtil.convertObjectToJsonBytes(order))) .andExpect(status().isOk()); Message<?> received = messageCollector.forChannel(processor.output()).poll(); assertNotNull(received); assertEquals(received.getPayload(), order); } }
In the OrderServiceIntTest I changed one of the test methods so it publishes an OrderDTO message on the (test) channel where the OrderService is subscribed to:
@Test @Transactional public void assertOrderOK() throws InvalidOrderException { .... //orderService.registerOrder(order); Message<OrderDTO> message = new GenericMessage<OrderDTO>(order); processor.input().send(message); .... }
More information about Spring Cloud Stream testing can be found here.
Wiring it all up
Now let’s see if our beers will flow. So here are our stock levels before:
Now post a new (valid) order with Postman:
And behold our new stock levels:
It still works! So our new setup with a Kafka topic in the middle is working like a charm! Note that this is a very simplistic example. To make it more robust – for one what about failed orders?! – the first step would be to move the topic consumer code away from the OrderService and put it in a separate class. That consumer class can delegate processing to an injected OrderService and deal with possible errors, eg. by moving the order to another topic. And with another topic you need custom interfaces for your channels as well.
Summary
In this blog post we introduced a Kafka topic to separate our Order clients from our Order processing. With the help of Spring Cloud Stream this is very easy to do. We also looked at a few ways to test messaging with Spring Cloud Stream.
The plan was to say goodbye to JHipster for now, but maybe I’ll do one more blog post. I wanna find out how easy it is to switch from Kafka to RabbitMQ or maybe improve upon this version and introduce a failed-order topic. I also wanna test how easy it is to upgrade this JHipster app to the latest version. So much ideas, so little time! Anyhow, let’s grab a beer first and think about that next blog!