Note that there are some explanatory texts on larger screens.

plurals
  1. POActiveMQ - Consumers not sharing the load
    primarykey
    data
    text
    <p>Let's say I have a queue called notifications and I post 1000 messages to that queue. I also have 20 consumers of that queue. I'd like each of those consumers to take one message from the queue, process it, then get the next available message. </p> <p>What is happening now is that a few of the consumers grab a lot of the messages and process them while other consumers do nothing.</p> <p>Below is a complete test case that demonstrates what I'm seeing. In reality the consumers are all separate processes on separate machines, but this duplicates the behavior exactly:</p> <pre><code>using System; using System.Threading; using Apache.NMS; using Apache.NMS.ActiveMQ; using Spring.Messaging.Nms.Core; namespace QueueTest { class Program { private static object _syncObj = new object(); private static int _row; static void Main() { var connectionFactory = new ConnectionFactory("tcp://localhost:61616"); var template = new NmsTemplate(connectionFactory); for (int i = 0; i &lt; 500; i++) { template.ConvertAndSend("notifications", "hello"); } for (int i = 0; i &lt; 10; i++) { ThreadPool.QueueUserWorkItem(Spawn, i); } Console.ReadKey(); } static void Spawn(object o) { int count = 0; int threadId = (int)o; var ts = new TimeSpan(0,0,0,5); while (true) { var connectionFactory = new ConnectionFactory( "tcp://localhost:61616" + "?nms.PrefetchPolicy.QueuePrefetch=1", String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications")); using (var conn = connectionFactory.CreateConnection()) { conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications"); conn.Start(); using (var session = conn.CreateSession()) { var queue = session.GetQueue("notifications"); using (var consumer = session.CreateConsumer(queue)) { IMessage msg = consumer.Receive(ts); while (msg != null) { lock (_syncObj) { Interlocked.Increment(ref _row); Console.WriteLine("{0}. {1} processed {2}", _row, threadId, ++count); if (_row == 500) { Environment.Exit(0); } } Thread.Sleep(1000); msg = consumer.Receive(ts); } Console.WriteLine("{0} nothing to process", threadId); } } conn.Stop(); } } } } } </code></pre> <p>How do I get it to spread out the messages to all of the consumers more evenly?</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload