Have you ever had that one piece of code that kept coming back and not because it was working like a charm?
I had to implement a Managed File Transfer solution with camel. This meant that we had to pick up a file, offload it to a share. Send the location through AMQ and pick it up from another server. After we receive the message from AMQ we had to pick up the file and sent it to its actual destination, simple right? Camel has a cool component for this, the content-enricher. This component can be used by polling consumers like the sftp component. Just what we needed!
On paper this looks like a simple camel route, however running it in production we noticed some ‘features’ of the pollEnrich component. In this post I will describe them and explain how to prevent server crashes, unreachable SFTP servers etc.
Because we needed to pick up a specific file from the sftp we started with a dynamic URL in combination with the pollEnrich component.
pollEnrich().simple(“sftp://server:port/location?fileName=${body.fileName}”)
After some testing we thought it worked like a charm. However when you let it run for a while you and look at the number of threads you will notice a lot of them, one for each new exchange. This will continue until you have no threads left. After a bit of googling we found the following thread: stackoverflow
If you go to the camel ticket and scroll to the bottom you will see the following line:
“This is more complicated to implement in a good way, we are closing this ticket for now.”
In short because it is a dynamic endpoint it is added to the endpoint registry but never actively removed after it is done. But fear not, the solution is easier than you might expect. You simply have to add a thread pool to the sftp component and the amount of threads are limited by that pool.
@Bean("PollEnrichThreadPool") @Autowired public ScheduledExecutorService scheduledExecutorService(CamelContext context) { return context.getExecutorServiceManager().newScheduledThreadPool(this, "RFTM-PollEnrichThreadPool", 8); }
The above code is an example of a threadpool, you just have to add the following property to the sftp endpoint configuration:
scheduledExecutorService=#PollEnrichThreadPool
Now after running the code again for a while we got a message that the sftp server was no longer responding because the max amount of open connections where reached. Looking again in the thread pool of our server we noticed that our connections where never closed. So we added the disconnect option to the configuration. This option stops the sftp connection after the pollEnrich is done.
After running it again on our server for a longer period we noticed that some transactions would simply hang. For some reason, a network hiccup or whatever, the polling consumer would never find a file and would wait until the end of the world. Restarting the context would resolve the issue and the file was consumed again. So we needed to add a timeout to the pollEnrich to trigger an automatic retry and to end the endless waiting.
Now you as you might expect this was not the last issue we found. We had to add the streamDownload option because large files were also triggering the timeout on the pollEnrich. When streaming the file, the sftp component immediately passes the stream to the next step in the process. If it is not enabled it will first load the file and then sent it to the next step in the process. If this takes too long the timeout is triggered again and again.
The last item we added was the “sendEmptyMessageWhenIdle” option. When the file is not on the server you don’t want to wait for the timeout but simply throw an exception and be done with it.
So in short when using the pollEnrich in combination with a file component check the following settings:
- scheduledExecutorService
- timeout
- disconnect
- streamDownload
- sendEmptyMessageWhenIdle