ActiveMQ connection pooling using Weld

Last week I, again, had to create a connection pool for a camel route that was sending messages to an ActiveMQ broker. This time I had to specify it in a project that uses Weld as an injection framework. I based my configuration on the ActiveMQ documentation.

The amount of connections and concurrent consumers is based on the standard documentation. The documentation states that you can easily set those numbers without any major impact. Increasing the amount is of course possible, it will however have impact on your memory usage. Before changing it make sure it is required for your application. I decided to go for the “standard” settings and change it whenever the application usage exceeds my expectations.

The documentation also states that a connectionFactory needs to be started and stopped for it to work correctly. Because I was using CDI I had to look for a solution to do this on startup / shutdown of the application. I found the “Initialized/Destroyed” annotations in combination with the “Observes” annotation. The “Observer” will make sure that the method is called whenever the the application scope is ether initialized or destroyed.


package nl.janssen.coolproject.mq;

import io.fabric8.annotations.ServiceName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

public class ActiveMQComponentFactory {

  public void init(@Observes @Initialized(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.start();
  }

  public void destroy(@Observes @Destroyed(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.stop();
  }

 @Produces
  @Named("connectionFactory")
  @ApplicationScoped
  public ActiveMQConnectionFactory jmsConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");

    return factory;
  }

  @Produces
  @Named("pooledConnectionFactory")
  @ApplicationScoped
  public PooledConnectionFactory createPooledConnectionFactory(@Named("connectionFactory") ActiveMQConnectionFactory factory){
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setMaxConnections(8);
    pooledConnectionFactory.setConnectionFactory(factory);
    return pooledConnectionFactory;
  }

  @Produces
  @Named("jmsConfiguration")
  @ApplicationScoped
  public JmsConfiguration createJmsConfiguration(@Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    JmsConfiguration jmsConfiguration =  new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
    jmsConfiguration.setConcurrentConsumers(10);
    return  jmsConfiguration;
  }

  @Produces
  @Named
  @Alias("jms")
  @ApplicationScoped
  public ActiveMQComponent createActiveMQComponent(@Named("jmsConfiguration") JmsConfiguration jmsConfiguration){
    ActiveMQComponent component = new ActiveMQComponent();
    component.setConfiguration(jmsConfiguration);
    return component;
  }

}

And of course to use the connection pool in your route you have to refer to the ActiveMQComponent. In my case I refered to the alias “jms”.


public class IndexerApi extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("direct:somewhere")
        .to("jms:queue:randomQueueName")
  }
}