Note that there are some explanatory texts on larger screens.

plurals
  1. POHow do I handle RabbitMQ Consumer Cancellation Notification when using Spring ChannelAwareMessageListener
    primarykey
    data
    text
    <p>Newbie to RabbitMQ and new to Java.</p> <p>I'm attempting to write a listener that will use manual acks and handle consumer cancellation notifications using the java Spring AMQP abstraction. Can I accomplish both tasks by using the Spring abstraction?</p> <p>I want to write a listener that will pull messages from a queue and process that message (maybe write to a database or something). I planned on using manual acknowledgements so that if processing of the message fails or can't be completed for some reason, I can reject and requeue. So far I think I've found that in order to manually ack/nack/reject using Spring AMQP I have to use a <code>ChannelAwareMessageListener</code>.</p> <p>I realize that I should be handling Consumer Cancellation Notifications from RabbitMQ, however using the <code>ChannelAwareMessageListener</code> I don't really see a way to code for this. The only way I see to handle CCN is to write code using the lower level java client api by calling <code>channel.basicConsume()</code> and passing a new <code>DefaultConsumer</code> instance which allows you to handle message delivery and cancels.</p> <p>I also don't see how I would set the <code>clientProperties</code> on the <code>ConnectionFactory</code> (to tell the broker I can handle the CCN) since I am getting the factory from a bean in config.</p> <p>My pseudo code of the listener and creation of container is below.</p> <pre><code>public class MyChannelAwareListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { msgProcessed = processMessage(message); if(msgProcessed) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); else channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } public static void main(String[] args) throws Exception { ConnectionFactory rabbitConnectionFactory; ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH); rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory"); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); MyChannelAwareListener listener = new MyChannelAwareListener(); container.setMessageListener(listener); container.setQueueNames("myQueue"); container.setConnectionFactory(rabbitConnectionFactory); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.start(); } </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload