Note that there are some explanatory texts on larger screens.

plurals
  1. POSingle queue: concurrent message processing with multiple consumers
    primarykey
    data
    text
    <p>I am new to jms. The goal is to process messages concurrently from a queue in an asynchronous listener's onMessage method by attaching a listener instance to multiple consumer's with each consumer using its own session and running in a separate thread, that way the messages are passed on to the different consumers for concurrent processing. </p> <p>1) Is it possible to process messages concurrently from a single queue by creating multiple consumers ? 2) I came up with the below code, but would like to get your thoughts on whether the below code looks correct for what I want to accomplish. </p> <pre><code>public class QueueConsumer implements Runnable, MessageListener { public static void main(String[] args) { QueueConsumer consumer1 = new QueueConsumer(); QueueConsumer consumer2 = new QueueConsumer(); try { consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON"); consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON"); } catch (JMSException ex) { ex.printStackTrace(); System.exit(-1); } Thread newThread1 = new Thread(consumer1); Thread newThread2 = new Thread(consumer1); newThread1.start(); newThread2.start(); } private static String connectionFactoryName = null; private static String queueName = null; private static ConnectionFactory qcf = null; private static Connection queueConnection = null; private Session ses = null; private Destination queue = null; private MessageConsumer msgConsumer = null; public static final Logger logger = LoggerFactory .getLogger(QueueConsumer.class); public QueueConsumer() { super(); } public void onMessage(Message msg) { if (msg instanceof TextMessage) { try { //process message } catch (JMSException ex) { ex.printStackTrace(); } } } public void run() { try { queueConnection.start(); } catch (JMSException e) { e.printStackTrace(); System.exit(-1); } while (!Thread.currentThread().isInterrupted()) { synchronized (this) { try { wait(); } catch (InterruptedException ex) { break; } } } } public void init(String factoryName, String queue2) throws JMSException { try { qcf = new JMSConnectionFactory(factoryName); queueConnection = qcf.createConnection(); ses = queueConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); queue = ses.createQueue(queue2); logger.info("Subscribing to destination: " + queue2); msgConsumer = ses.createConsumer(queue); msgConsumer.setMessageListener(this); System.out.println("Listening on queue " + queue2); } catch (Exception e) { e.printStackTrace(); System.exit(-1); } } private static void setConnectionFactoryName(String name) { connectionFactoryName = name; } private static String getQueueName() { return queueName; } private static void setQueueName(String name) { queueName = name; } </code></pre> <p>}</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload