Note that there are some explanatory texts on larger screens.

plurals
  1. POWhere is my expired JMS message?
    text
    copied!<p>I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading ExchangePattern.InOnly messages from a JMS queue, and want to expire those which are not processed within a given time explicitly to a named dead letter queue.</p> <p>I have the following route:</p> <pre><code>public class FulfillmentRequestRoute extends RouteBuilder { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage()); from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&amp;timeToLive=10000&amp;transacted=true") .transacted() .to("mock:initialProcessor"); } } </code></pre> <p>And the following ActiveMQ config:</p> <pre><code>&lt;!-- Configure the ActiveMQ JMS broker server to listen on TCP port 61610 --&gt; &lt;broker:broker useJmx="true" persistent="true" brokerName="myBroker"&gt; &lt;broker:transportConnectors&gt; &lt;!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side --&gt; &lt;broker:transportConnector name="vm" uri="vm://myBroker" /&gt; &lt;!-- expose a TCP transport for clients to use --&gt; &lt;broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" /&gt; &lt;/broker:transportConnectors&gt; &lt;broker:persistenceAdapter&gt; &lt;broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/&gt; &lt;/broker:persistenceAdapter&gt; &lt;broker:destinationPolicy&gt; &lt;broker:policyMap&gt; &lt;broker:policyEntries&gt; &lt;!-- Set the following policy on all queues using the '&gt;' wildcard --&gt; &lt;broker:policyEntry queue="&gt;"&gt; &lt;broker:deadLetterStrategy&gt; &lt;broker:sharedDeadLetterStrategy processExpired="true" processNonPersistent="true" /&gt; &lt;/broker:deadLetterStrategy&gt; &lt;/broker:policyEntry&gt; &lt;/broker:policyEntries&gt; &lt;/broker:policyMap&gt; &lt;/broker:destinationPolicy&gt; &lt;/broker:broker&gt; &lt;!-- Configure Camel ActiveMQ to use the embedded ActiveMQ broker declared above --&gt; &lt;!-- Using the ActiveMQComponent gives us connection pooling for free --&gt; &lt;bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"&gt; &lt;property name="brokerURL" value="vm://myBroker" /&gt; &lt;property name="transacted" value="true"/&gt; &lt;property name="transactionManager" ref="jmsTransactionManager"/&gt; &lt;property name="acceptMessagesWhileStopping" value="false"/&gt; &lt;/bean&gt; &lt;bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"&gt; &lt;property name="connectionFactory" ref="jmsConnectionFactory"/&gt; &lt;/bean&gt; &lt;bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"&gt; &lt;property name="brokerURL" value="vm://myBroker" /&gt; &lt;/bean&gt; </code></pre> <p>Finally I have a Unit Test which creates two messages,one which will be processed, and the other which should time out.</p> <pre><code>@RunWith(CamelSpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"}) public class FulfillmentRequestTimeoutTest { @EndpointInject(uri = "mock:initialProcessor") protected MockEndpoint mockEndpoint; @Produce protected ProducerTemplate template; protected ConsumerTemplate consumer; @Autowired @Qualifier("camel-server") protected CamelContext context; @DirtiesContext @Test public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception { // Given consumer = context.createConsumerTemplate(); int expectedValidMessageCount = 2; mockEndpoint.expectedMessageCount(expectedValidMessageCount); // When String xmlBody1 = "&lt;?xml version=\"1.0\"?&gt;&lt;body&gt;THIS WILL NOT TIMEOUT&lt;/body&gt;"; template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1); long ttl = System.currentTimeMillis() - 12000000; String xmlBody2 = "&lt;?xml version=\"1.0\"?&gt;&lt;body&gt;!!!!!TIMED OUT!!!!!&lt;/body&gt;"; template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl); // Then // The second message is not processed mockEndpoint.assertIsSatisfied(); // This should not pass with "2" set above List&lt;Exchange&gt; list = mockEndpoint.getReceivedExchanges(); String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class); assertEquals(xmlBody1, notTimedOutMessageBody); Thread.sleep(5000); // And is instead routed to the timedOut JMS queue Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead"); assertNotNull("Should not lose the message", dlqBody); // This fails assertEquals(xmlBody2, dlqBody); } @Configuration public static class ContextConfig extends SingleRouteCamelConfiguration { @Bean public RouteBuilder route() { return new FulfillmentRequestRoute(); } } } </code></pre> <p>The second message isn't expiring at all, despite taking into account @Petter's tip (thanks) below.</p> <p>I have this Unit-test-pattern working elsewhere in tests which explicitly throw exceptions from with transactions in Camel, but I'd prefer not to have to manually start looking into headers myself when this all seems to be handled already.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload