Lately I have been working as a full stack developer, I have been working on an Angular front end in combination with some Spring-Boot backend services. Very hip and trendy if you ask me and a lot of fun to do.
One of the issues I was facing was a service call that took forever to complete. The first response of the operation came back in a matter of a few seconds, but the final processing takes some more time. Normally this wouldn’t be a problem, however we didn’t want to bother the user unless the operation failed for some reason. So we needed a way to send a message to the user without blocking anything. That way the user can keep on working.
For this we first looked at websockets, however it was to be one way communication only so why bother with the extra complexity of setting up a websocket connection. So we looked for something closer to the current setup (http rest style of communication) and came with Service-sent Events. Not too complex and lots of posts can be found on how to use it. However all examples stop at the point where it gets interesting. How can you emit a new event from another (async process). All examples simply do a while loop and emit an event every second, easy! In this post I will show you how to set up a Server-sent event (from now on SSE) endpoint with Spring-boot and how to emit your own cool event by means of a bean.
Server
First off we need to create an endpoint capable of handling SSE. This can be done in a Controller class:
@Controller
public class TemplateNotificationSocket {
@GetMapping(path = "/notification", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Notification> startSSE() {
return Flux.interval(Duration.ofSeconds(1)).map(this::createNotification);
}
}
So a few things that you can notice in this class, first the GetMapping. It produces a TEXT_EVENT_STREAM_VALUE, in other words, we are telling spring that we want to emit events. The next thing you can notice is the Flux<Notification> object that the startSSE returns. Flux comes from the spring WebFlux library and is based on the Reactive concept (explained in a previous post: <Link goes here>). For my loyal readers or those who have used a reactive framework like RxJava or RxJS. Flux is like an Observable only you call it Flux.
With the Flux object we can emit Notification objects, exactly what we want! Now by returning the Flux object a new notification object is emitted for each new event. In this example we emit e new notification each second. After running the application and going to http://localhost:8080/notification you will see a new notification each second (some json string) in your browser. Cool, now we need to change it so that we only emit notifications that make sense!
So this is the part the posts on SSE stop and I had to look for a solution. I found it in the EmitterProcessor class:
private final EmitterProcessor<Notification> processor = EmitterProcessor.<Notification>create();
@GetMapping(path = "/notification", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Notification> startSSE() {
return Flux.from(processor);
}
public void emitNotification(Notification notification){
processor.onNext(notification);
}
}
The EmitterPRocessor is a publisher that allows you to emit an event any time you want by simply calling the onNext method. As you can see in the example I have changed the implementation so that the Flux object is created from the processor and each time somebody calls the emitNotification method a new event is sent to the consumers of the SSE.
Allright, the only thing we need to do now is send an actual event. As it is a Controller class you can simply inject it into the class that needs to emit the event and call the emitNotification method with your notification. If anybody has a running session they will receive the notification. But what if you are only interested in certain notifications. Since we are using Flux this is an easy thing to do.
@GetMapping(path = "/notification/{filter}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Notification> startSSE(@PathVariable("filter") String filter) {
return Flux.from(processor).filter(notification -> notification.getMessage().contains(filter));
}
By adding the PathVariable we can add a constraint / filter. Just like the Observable, the Flux object supports all kinds of operators and one of them is the filter operator. So when we start the Flux subscription with our rest call we add the filter and now only events that we are interested in are returned.
That is it, a great way to send events from your backend to the frontend.
Client
Now from a client perspective, in my case Angular with TypeScript, you can easily consume the SSE. With the help of the EventSource class you have it up and running in a few minutes. After some testing I noticed one problem: connection dropping. Now by default SSE has some retry mechanism but in some browsers it wasn’t working. The solution I found to work best was to create a reconnect mechanism with some back off policy.
private eventSource: EventSource;
private timer;
constructor() {
}
getServerSentEvent(moduleName: string): Observable<any> {
return new Observable(observer => {
this.startEventSource(moduleName, observer);
});
}
private startEventSource(filter: string, observer) {
this.eventSource = this.getEventSource(filter);
this.eventSource.onmessage = event => {
if (event.data) {
const notification = (JSON).parse(event.data);
observer.next(notification);
}
};
this.eventSource.onerror = error => {
this.closeEvent();
this.timer = setTimeout(() => this.startEventSource(moduleName, observer), 1000);
};
}
closeEvent() {
if (this.timer) {
this.timer = null;
}
this.eventSource.close();
}
private getEventSource(filter: string): EventSource {
return new EventSource(this.url + '/' + filter);
}
In this example, we start the SSE, and if an error occurs we simply restart the connection with a back off of 1 second. If the connection is disrupted, containers are a fickle thing, it will be restarted automatically. Once the consumer is done it can call the close method to stop the SSE. That is all, Happy coding!!