Part 3 – Enter ReactiveX

In my previous post I showed you how to call Rest APIs using the vertx Webclient. Now the end code looked a bit messy with a lot of “if then else” statements. Luckily the people behind Vert.X have added the ReactiveX libraries to Vert.X. This means that we can use the observable pattern to clean up the code. As explained in Reactive Micro services an observable can be based on anything. A HTTP call to a Rest API is a perfect candidate for the “Single” observable variant. The Single observable returns a result or an error the same as the HTTP call.

Now Changing the client to a reactive client is easy. The first thing you need to do, is to make sure you are using the right packages.

You can import the Vert.X WebClient from two packages:

import io.vertx.reactivex.ext.web.client.WebClient;
#or
import io.vertx.ext.web.client.WebClient;

As you might expect we need the WebClient from the reactivex packages. With this client we can send a HTTP request and get an Observable of the type “io.reactivex.Single” as a reponse. The following example contains the code to call the Public Flight API:

    private Single<HttpResponse> callPublicFlight(String flightName) {
        logger.info(String.format("Calling public flight endpoint at: https://api.schiphol.nl/public-flights/flights with flightname %s", flightName));
        HttpRequest request = client
                .getAbs("https://api.schiphol.nl/public-flights/flights")
                .addQueryParam("app_id", APP_ID)
                .addQueryParam("app_key", APP_KEY)
                .addQueryParam("flightname", flightName)
                .addQueryParam("flightdirection", "D")
                .putHeader("Accept", "application/json")
                .putHeader("ResourceVersion", "v3")
                .as(BodyCodec.json(FlightList.class));

        return request.rxSend();
    }

Now as you can see not a lot has changed. The main difference is in the final statement where we send out the request and the return type of the method. Instead of passing a handler along in the signature we get a Single Observable as a return type.

The biggest changes are in handling the response of the API calls. This was previously done in the handler. Now it can be done via the returned observable. The if then else statements are replaced by the following code. As you will notice it no longer contains checks if the Rest calls were successful.

   private Single calculateTimeToDeparteToTheAirport(String flightId, String origin) {
        Single response = callPublicFlight(flightId).flatMap(flightListHttpResponse -> {
            ResponseModel response = createResponseModel(flightListHttpResponse);
            return getTravelTimeByCar(origin, response.getTimeToBeAtSchiphol()).map(travelTimeResponse -> {
                addTimeToLeave(response, travelTimeResponse);
                return response;
            });
        });
        return response;
    }

At a first glance the method contains a lot of return statements and is still not really “clean”. I left it for now for the sake of explaining the code. Skip to next example to see the cleaned up version.

The first thing we do is call the “callPublicFlight” method. This method returns the Single observable. Now you have two options, the first is to simply start the observable by subscribing to it and handling the response in the “onSuccess”. However you will end up with a similar thing as the already mentioned “callback hell”. The second option is to chain the observables. With the help of “operators” you can change, process or get another observable. In other words, you can use the operators to chain multiple observables in a “builder pattern” fashion.

In our case when the “callPublicFlight” returns a result we need to process that result and call the “getTravelTimeByCar”. Now normally you would use the map for this. However the “getTravelTimeByCar” also returns an observable. This means that if you use the “map” operator you would receive a single that would contain a single (Single<Single>). In order to prevent this you would have to use the flatMap operator. This operator will reduce/flatten the Single<Single> into a Single.

Now the “getTravelTimeByCar” will return the time you have to leave in order to be on time at Schiphol. With the map operator we can extract the result and create the final response. This response is then returned and if all goes well it will be the only object the Single observable “onSuccess” will emit.

After some refactoring the code the return statements have been removed:


            Single responseModelSingle = callPublicFlight(flightId)
                    .map(this::createResponseModel)
                    .flatMap(responseModel -> getTimeToLeave(origin, responseModel));

I have added the “map” operator to first create the initial response method and I have moved the calling and parsing of the google API to the getTimeToLeave method. This way the actual logic of the service has become just a couple lines of code that are easy to read.

Now the last part is subscribing to the actual observer. Because without a subscriber this observable will emit nothing.

            responseModelSingle.subscribe(
                    responseModel -> routingContext.response()
                        .putHeader("Content-Type", "application/json")
                        .end(mapper.writeValueAsString(responseModel)), 
                    error -> routingContext.response()
                        .setStatusCode(500)
                        .end(error.getMessage()));
            };

Because we have a single we either get an result or an error. So the When we get an onNext (the responseModel) we return the json and if we receive an error we return an (custom) error message.

Now I have shown you how you can use ReactiveX to minimize the callback hell and how to chain and manipulate observables. In my next post I will show you how you can zip or merge observables an create a reactive multi threaded beast.