ActiveMQ DLQ use OriginalDestination in Camel

Ah the DLQ, the place where messages go to die. When messages end up in the DLQ (Dead letter queue) of ActiveMQ they receive an additional message header with the queue name where the message resided originally. This message header called Original Destination, can be viewed from Hawtio.

DLQSubscriber-1

When using Apache Camel as a subscriber I noticed something strange. Normally all JMS Headers and properties are mapped to the Camel Exchange headers and properties. However, the OriginalDestination header was not.

For example when creating this dummy route for testing purposes I got the following output:


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.camel.builder.RouteBuilder;

public class DLQSubscriberRouteBuilder extends RouteBuilder{

    @Override
    public void configure() throws Exception {
        from("amq:queue:ActiveMQ.DLQ").log("${headers}");
    }

}


 


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-13:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438348926673, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:1:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438348926663, JMSType=null, JMSXGroupID=null, JMSXUserID=null, originalExpiration=1438348926673}

I spent quite some time trying to figure out what happened when I realized the JMS message on the DLQ is not the exact same message from the original queue with some extra headers and properties. The original message seems to be wrapped inside the message placed on the DLQ. To access this message we need to work with the “raw” JMSMessage from ActiveMQ rather the processed message from Camel. This can be done by creating a processor which uses a typeconverter in Camel to get the JMS message from the Camel exchange.

In the code below I created a processor which fetches the inner message and from that message retrieves the originalDestination property (which is an instance variable on the ActiveMQMessage class (ActiveMQTextMessage is a subclass of ActiveMQMessage) and places this property as an header on the Camel exchange.


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsMessage;

public class DQLMessageProcessor implements Processor{

    @Override
    public void process(Exchange exchange) throws Exception {
        JmsMessage jmsMsg = exchange.getIn().getBody(JmsMessage.class);
        ActiveMQTextMessage innerMsg = (ActiveMQTextMessage) jmsMsg.getJmsMessage();
        exchange.getIn().setHeader("OriginalDestination", innerMsg.getOriginalDestination());
    }

}


Now when observing the output log of .log(“${headers}”) we see the OriginalDestination header in the log output:


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-21:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438350527570, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:4:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438350527560, JMSType=null, JMSXGroupID=null, JMSXUserID=null, OriginalDestination=queue://test2, originalExpiration=1438350527570}

In the end it costs me quite some time to grab the concept of fetching the ActiveMQTextMessage from the JmsMessage, since I was under the impression the Camel ActiveMQ component does this out of the box.

Anyway I hope it helps someone.