Playing around with Camel AsyncProcessor

One of the most frequently used constructs in Apache Camel is the Processor (http://camel.apache.org/processor.html), it is used ofter for invoking custom code or performing message translations. The API of the processor is very clear and well documented. As are numerous examples available for using a Camel Processor. The lesser known brother of the Processor is the AsyncProcessor (http://camel.apache.org/asynchronous-processing.html) which is less documented and a less frequently used. Mainly because the AsyncProcessor is mainly targeted for Camel Component developers. However, recently I decided to play around with the Camel AsyncProcessor in a regular Camel setup. In this blog I would like to explain one possible way how to use the AsyncProcessor in a Camel route setup.

Creating a AsyncProcessor

Similar to creating a Processor, creating a AsyncProcessor starts by implementing the AsyncProcessor interface.


public class MyAsyncProcessor implements AsyncProcessor {

But instead of one process method, now two must be implemented:


@Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {

@Override public void process(Exchange exchange) {

Next to the regular process method  process method which takes an AsyncCallback must be implemented. The AsyncCallback is invoked whenever the Async execution (which must be started in a seperate thread) is finished. The return boolean indicates whether or not the Camel routing engine must wait or continue routing to other components/processors defined in the Camel route.

Starting the Async job

In order to start a async job the execution must take place in another thread. Java nowadays has multiple ways for concurrent, multi threaded execution. For this example we simply create a Runnable class en start the job via een executor service.


private class AsyncBackgroundProcess implements Runnable {
  private Exchange exchange;
  private AsyncCallback asyncCallback;
  public AsyncBackgroundProcess(Exchange exchange, AsyncCallback asyncCallback){
    this.exchange = exchange;
    this.asyncCallback = asyncCallback;
  }
  @Override public void run() {
    log.info("Async backend process started");
    Boolean getBoolean = slowExecutionInterface.getBoolean("bla");
    exchange.setProperty("Response", getBoolean);
    log.info("Async backend process completed");
    asyncCallback.done(false);
  }
}

There are two things to note here:

  1. The AsyncCallback object from the process method is passed to the Runnable class
  2. When the execution is finished the asyncCallback.done method is called
    1. The false parameter indicates if the execution is handled synchronously (true) or asynchronously (false)

Implementing the AsyncProcessor process method

In the Async Processor process method we kickoff the Runnable class and define the callback method:


@Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {
  log.info("Async process started");
  CountDownLatch countDownLatch = new CountDownLatch(1);
  exchange.setProperty("countDownLatch", countDownLatch);
  executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
    @Override public void done(boolean b) {
      log.info("Async backend process fininshed");
      exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
    }
  }));
  return true;
}

Using the AsyncProcessor in a Camel route

Using a AsyncProcessor in a Camel route is exactly the same as using a regular Processor in the route:


from(jettyEndpoint)
.log("received")
.process(myAsyncProcessor)

Getting a response

By default the AsyncProcessor triggers a new thread for execution and does not sync back to the main execution thread. So getting a response in a “Fork-Join” manner requires some additional work. In the implementation of the process method above some actions for getting a response are already present:

A CountDownLatch is used for defining the number of threads the main thread can wait for (in our case 1):


CountDownLatch countDownLatch = new CountDownLatch(1);

in order to use the CountDownLatch downstream in our Camel route we save the it to an exchange property:


exchange.setProperty("countDownLatch", countDownLatch);

in the trigger for the Runnable class in a new thread we define the AsyncCallback and its actions when the done method is invoked, to count down the CountDownLatch indicating the background thread is finished:


executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
    @Override public void done(boolean b) {
      log.info("Async backend process fininshed");
      exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
    }
  }));

But this does not synchronize our threads just that. In order to get a response in our Camel route or Processor the execution must wait at some point in time for the background thread to complete. For this a helper method was created:


public static void getResponseInBody(Exchange exchange) {
  CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
  exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
  log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
  exchange.getIn().setBody(exchange.getProperty("Response", String.class));
}

In this helper method the CountDownLatch is used from the exchange and the Camel AwaitManager is used for the thread synchronization:

CountDownLatch countDownLatch = exchange.getProperty(“countDownLatch”, CountDownLatch.class);
exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);

Since this helper method is static, it can be invoked from anywhere in the route or processor, thereby giving the flexibility where in the route the threads must be synchronized.


@Override
public void configure() throws Exception {
from(jettyEndpoint)
.log("received")
.process(myAsyncProcessor)
.log("exited processor")
.bean(MyAsyncProcessor.class, "getResponse(${exchange})")
.log("${body} and Response Property ${property.Response}");
}

The entire AsyncProcessor looks like this:


package nl.rubix.eos.poc.asyncprocessor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Named("myAsyncProcessor")
public class MyAsyncProcessor implements AsyncProcessor {
  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
  private static Logger log = LoggerFactory.getLogger(MyAsyncProcessor.class);
  @Inject
  @Named("slowExecutionImpl")
  private SlowExecutionInterface slowExecutionInterface;
  @Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {
    log.info("Async process started");
    CountDownLatch countDownLatch = new CountDownLatch(1);
    exchange.setProperty("countDownLatch", countDownLatch);
    executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
      @Override public void done(boolean b) {
        log.info("Async backend process fininshed");
        exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
      }
    }));
    return true;
  }
  @Override public void process(Exchange exchange) throws Exception {
    throw new IllegalStateException("Should never be called");
  }
  private class AsyncBackgroundProcess implements Runnable {
    private Exchange exchange;
    private AsyncCallback asyncCallback;
    public AsyncBackgroundProcess(Exchange exchange, AsyncCallback asyncCallback){
      this.exchange = exchange;
      this.asyncCallback = asyncCallback;
    }
    @Override public void run() {
      log.info("Async backend process started");
      Boolean getBoolean = slowExecutionInterface.getBoolean("bla");
      exchange.setProperty("Response", getBoolean);
      log.info("Async backend process completed");
      asyncCallback.done(false);
    }
  }
  public static void getResponseInBody(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
    exchange.getIn().setBody(exchange.getProperty("Response", String.class));
  }
  public static void getResponseInProperty(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
  }
  public static Boolean getResponse(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
    return exchange.getProperty("Response", Boolean.class);
  }
}

The entire Camel route looks like this:


package nl.rubix.api.poc;
import nl.rubix.eos.poc.asyncprocessor.MyAsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.ContextName;
import org.apache.camel.cdi.Uri;
import javax.inject.Inject;
import javax.inject.Named;
/**
 * Configures all our Camel routes, components, endpoints and beans
 */
@ContextName("myJettyCamel")
public class MyJettyRoute extends RouteBuilder {
    @Inject @Uri("jetty:http://0.0.0.0:8080/async/test")
    private Endpoint jettyEndpoint;
    @Inject
    @Named("myAsyncProcessor")
    MyAsyncProcessor myAsyncProcessor;
    @Override
    public void configure() throws Exception {
        from(jettyEndpoint)
            .log("received")
            .process(myAsyncProcessor)
            .log("exited processor")
            .bean(MyAsyncProcessor.class, "getResponse(${exchange})")
            .log("${body} and Response Property ${property.Response}");
    }
}

To simulate slow execution a simple interface and corresponding implementation where used:


package nl.rubix.eos.poc.asyncprocessor;
public interface SlowExecutionInterface {
  Boolean getBoolean(String input);
}

package nl.rubix.eos.poc.asyncprocessor;
import javax.inject.Named;
@Named("slowExecutionImpl")
public class SlowExecutionInterfaceMock implements SlowExecutionInterface {
  @Override public Boolean getBoolean(String input) {
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return true;
  }
}