Note that there are some explanatory texts on larger screens.

plurals
  1. POSubscribe to a queue, receive 1 message, and then unsubscribe
    primarykey
    data
    text
    <p>I have a scenario where I need to distribute and process jobs extremely quickly. I'll have about 45 jobs populated in the queue quickly and I can process about 20 simultaneously (5 machines, 4 cores each). Each job takes a variable amount of time, and to complicate matters garbage collection is an issue so I need to be able to take a consumer offline for garbage collection.</p> <p>Currently, I have everything working with pop (every consumer pops every 5ms). This seems undesirable because it translates to 600 pop requests per second to rabbitmq.</p> <p>I would love if it there were a pop command that would act like subscribe, but for only one message. (the process would block, waiting for input from the rabbitMQ connection, via something akin to Kernel.select)</p> <p>I have tried to trick the AMQP gem to doing something like this, but it's not working: I can't seem to unsubscribe until the queue is empty and no more messages are being sent to the consumer. Other methods of unsubscribing I fear will lose messages.</p> <p>consume_1.rb :</p> <pre><code>require "amqp" EventMachine.run do puts "connecting..." connection = AMQP.connect(:host =&gt; "localhost", :user =&gt; "guest", :pass =&gt; "guest", :vhost =&gt; "/") puts "Connected to AMQP broker" channel = AMQP::Channel.new(connection) queue = channel.queue("tasks", :auto_delete =&gt; true) exchange = AMQP::Exchange.default(channel) queue.subscribe do |payload| puts "Received a message: #{payload}." queue.unsubscribe { puts "unbound" } sleep 3 end end </code></pre> <p>consumer_many.rb:</p> <pre><code>require "amqp" # Imagine the command is something CPU - intensive like image processing. command = "sleep 0.1" EventMachine.run do puts "connecting..." connection = AMQP.connect(:host =&gt; "localhost", :user =&gt; "guest", :pass =&gt; "guest", :vhost =&gt; "/") puts "Connected to AMQP broker" channel = AMQP::Channel.new(connection) queue = channel.queue("tasks", :auto_delete =&gt; true) exchange = AMQP::Exchange.default(channel) queue.subscribe do |payload| puts "Received a message: #{payload}." end end </code></pre> <p>producer.rb:</p> <pre><code>require "amqp" i = 0 EventMachine.run do connection = AMQP.connect(:host =&gt; "localhost", :user =&gt; "guest", :pass =&gt; "guest", :vhost =&gt; "/") puts "Connected to AMQP broker" channel = AMQP::Channel.new(connection) queue = channel.queue("tasks", :auto_delete =&gt; true) exchange = AMQP::Exchange.default(channel) EM.add_periodic_timer(1) do msg = "Message #{i}" i+=1 puts "~ publishing #{msg}" end end </code></pre> <p>I'll launch consume_many.rb and producer.rb. Messages will flow as expected.</p> <p>When I launch consume_1.rb, it gets every other message (as expected). But it NEVER unsubscribes because it never finishes processing all of its messages... so on it goes.</p> <p>How do I get consume_1.rb to subscribe to the queue, get a single message, and then take itself out of the load-balancer ring so it can do it's work, without losing any additional pending jobs that might be in the queue and would otherwise be scheduled to be sent to the process?</p> <p>Tim</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload