Note that there are some explanatory texts on larger screens.

plurals
  1. POHornetQ messages still remaining in queue after consuming using core api
    text
    copied!<p>I am new to HornetQ so please bear with me. Let me first tell you my requirements :</p> <p>I need a messages queuing middleware which can pass messages , of about 1k in size, between different process with low latency and persistence(i.e. it should survive system crashes). I would be having multiple processes writing to the same queues and similarly multiple process reading from the same queue.</p> <p><em>For this I chose HornetQ as it has the best rating for message passing with persistence.</em></p> <p>I am currently usung <strong>Hornetq v2.2.2Final</strong> as <strong><em>stand alone server</em></strong>.<br> I am able to succesfully create durable/non-durable queues using <strong>core api</strong> <em>(ClientSession)</em>, and successfully post messages to queue <em>(ClientProducer)</em>.<br> Similarly I am able to read the messages from the queue using core api <em>(ClientConsumer)</em>. </p> <p>The problem comes after this when the client has read the message, the message still remains in the queue, i.e. <strong>the number of messages in the queue remains constant</strong>. Maybe I am getting this wrong but I was under this impression that once the message is consumed <strong><em>(read + ack)</em></strong>, it is removed from the queue.But this is not happening in my case, and the same messages is being read over and over again.</p> <p><em>Also , I would like to tell that I have tried using non-durable queues with non-durable messages. but the problem remains</em>.</p> <p>Code for producer that I am using:</p> <pre><code>public class HQProducer implements Runnable { private ClientProducer producer; private boolean killme; private ClientSession session; private boolean durableMsg; public HQProducer(String host, int port, String address, String queueName, boolean deleteQ, boolean durable, boolean durableMsg, int pRate) { this.durableMsg = durableMsg; try { HashMap map = new HashMap(); map.put("host", host); map.put("port", port); TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); ClientSessionFactory factory = locator.createSessionFactory(); session = factory.createSession(); if (queueExists(queueName)) { if (deleteQ) { System.out.println("Deleting existing queue :: " + queueName); session.deleteQueue(queueName); System.out.println("Creating queue :: " + queueName); session.createQueue(address, queueName, true); } } else { System.out.println("Creating new queue :: " + queueName); session.createQueue(address, queueName, durable); } producer = session.createProducer(SimpleString.toSimpleString(address), pRate); killme = false; } catch (Exception ex) { Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); } } @Override public void run() { long time = System.currentTimeMillis(); int cnt = 0; long timediff; while (!killme) { try { ClientMessage message = session.createMessage(durableMsg); message.getBodyBuffer().writeString("Hello world"); producer.send(message); cnt++; timediff = ((System.currentTimeMillis() - time) / 1000); if (timediff &gt;= 1) { System.out.println("Producer tps :: " + cnt); cnt = 0; time = System.currentTimeMillis(); } } catch (HornetQException ex) { Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); } } try { session.close(); } catch (HornetQException ex) { Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); } } public void setKillMe(boolean killme) { this.killme = killme; } private boolean queueExists(String qname) { boolean res = false; try { //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname)); QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname)); if (queueQuery.isExists()) { res = true; } } catch (HornetQException ex) { res = false; } return res; } } </code></pre> <p>Also the code for consumer is : </p> <pre><code>public class HQConsumer implements Runnable { private ClientSession session; private ClientConsumer consumer; private boolean killMe; public HQConsumer(String host, int port, String queueName, boolean browseOnly) { try { HashMap map = new HashMap(); map.put("host", host); map.put("port", port); TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); ClientSessionFactory factory = locator.createSessionFactory(); session = factory.createSession(); session.start(); consumer = session.createConsumer(queueName, "",0,-1,browseOnly); killMe = false; } catch (Exception ex) { Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); } } @Override public void run() { long time = System.currentTimeMillis(); int cnt = 0; long timediff; while (!killMe) { try { ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge(); //System.out.println("message = " + msgReceived.getBodyBuffer().readString()); cnt++; timediff = ((System.currentTimeMillis() - time) / 1000); if (timediff &gt;= 1) { System.out.println("ConSumer tps :: " + cnt); cnt = 0; time = System.currentTimeMillis(); } } catch (HornetQException ex) { Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); } } try { session.close(); } catch (HornetQException ex) { Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); } } public void setKillMe(boolean killMe) { this.killMe = killMe; } } </code></pre> <p>HornetQ server config ::</p> <pre><code>&lt;configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"&gt; &lt;paging-directory&gt;${data.dir:../data}/paging&lt;/paging-directory&gt; &lt;bindings-directory&gt;${data.dir:../data}/bindings&lt;/bindings-directory&gt; &lt;journal-directory&gt;${data.dir:../data}/journal&lt;/journal-directory&gt; &lt;journal-min-files&gt;10&lt;/journal-min-files&gt; &lt;large-messages-directory&gt;${data.dir:../data}/large-messages&lt;/large-messages-directory&gt; &lt;connectors&gt; &lt;connector name="netty"&gt; &lt;factory-class&gt;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory&lt;/factory-class&gt; &lt;param key="host" value="${hornetq.remoting.netty.host:localhost}"/&gt; &lt;param key="port" value="${hornetq.remoting.netty.port:5445}"/&gt; &lt;/connector&gt; &lt;connector name="netty-throughput"&gt; &lt;factory-class&gt;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory&lt;/factory-class&gt; &lt;param key="host" value="${hornetq.remoting.netty.host:localhost}"/&gt; &lt;param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/&gt; &lt;param key="batch-delay" value="50"/&gt; &lt;/connector&gt; &lt;/connectors&gt; &lt;acceptors&gt; &lt;acceptor name="netty"&gt; &lt;factory-class&gt;org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory&lt;/factory-class&gt; &lt;param key="host" value="${hornetq.remoting.netty.host:localhost}"/&gt; &lt;param key="port" value="${hornetq.remoting.netty.port:5445}"/&gt; &lt;/acceptor&gt; &lt;acceptor name="netty-throughput"&gt; &lt;factory-class&gt;org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory&lt;/factory-class&gt; &lt;param key="host" value="${hornetq.remoting.netty.host:localhost}"/&gt; &lt;param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/&gt; &lt;param key="batch-delay" value="50"/&gt; &lt;param key="direct-deliver" value="false"/&gt; &lt;/acceptor&gt; &lt;/acceptors&gt; &lt;security-settings&gt; &lt;security-setting match="#"&gt; &lt;permission type="createNonDurableQueue" roles="guest"/&gt; &lt;permission type="deleteNonDurableQueue" roles="guest"/&gt; &lt;permission type="createDurableQueue" roles="guest"/&gt; &lt;permission type="deleteDurableQueue" roles="guest"/&gt; &lt;permission type="consume" roles="guest"/&gt; &lt;permission type="send" roles="guest"/&gt; &lt;/security-setting&gt; &lt;/security-settings&gt; &lt;address-settings&gt; &lt;!--default for catch all--&gt; &lt;address-setting match="#"&gt; &lt;dead-letter-address&gt;jms.queue.DLQ&lt;/dead-letter-address&gt; &lt;expiry-address&gt;jms.queue.ExpiryQueue&lt;/expiry-address&gt; &lt;redelivery-delay&gt;0&lt;/redelivery-delay&gt; &lt;max-size-bytes&gt;10485760&lt;/max-size-bytes&gt; &lt;message-counter-history-day-limit&gt;10&lt;/message-counter-history-day-limit&gt; &lt;address-full-policy&gt;BLOCK&lt;/address-full-policy&gt; &lt;/address-setting&gt; &lt;/address-settings&gt; &lt;/configuration&gt; </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload