Note that there are some explanatory texts on larger screens.

plurals
  1. POLimiting concurrency and rate for Python threads
    primarykey
    data
    text
    <p>Given a number threads I want to limit the rate of calls to the worker function to a rate of say one per second.</p> <p>My idea was to keep track of the last time a call was made across all threads and compare this to the current time in each thread. Then if <code>current_time - last_time &lt; rate</code>. I let the thread sleep for a bit. Something is wrong with my implementation - I presume I may have gotten the wrong idea about how locks work.</p> <p>My code:</p> <pre><code>from Queue import Queue from threading import Thread, Lock, RLock import time num_worker_threads = 2 rate = 1 q = Queue() lock = Lock() last_time = [time.time()] def do_work(i, idx): # Do work here, print is just a dummy. print('Thread: {0}, Item: {1}, Time: {2}'.format(i, idx, time.time())) def worker(i): while True: lock.acquire() current_time = time.time() interval = current_time - last_time[0] last_time[0] = current_time if interval &lt; rate: time.sleep(rate - interval) lock.release() item = q.get() do_work(i, item) q.task_done() for i in range(num_worker_threads): t = Thread(target=worker, args=[i]) t.daemon = True t.start() for item in xrange(10): q.put(item) q.join() </code></pre> <p>I was expecting to see one call per second to <code>do_work</code>, however, I get mostly 2 calls at the same time (1 for each thread), followed by a one second pause. What is wrong? </p> <hr> <p>Ok, some edit. The advice to simply throttle the rate at which items are put in the queue was good, however I remembered that I had to take care of the case in which items are re-added to the queue by the workers. Canonical example: pagination or backing-off-retry in network tasks. I came up with the following. I guess that for actual network tasks eventlet/gevent libraries may be easier on resources but this is just an example. It basically uses a priority queue to pile up the requests and uses an extra thread to shovel items from the pile to the actual task queue at an even rate. I simulated re-insertion into the pile by the workers, re-inserted items are then treated first.</p> <pre><code>import sys import os import time import random from Queue import Queue, PriorityQueue from threading import Thread rate = 0.1 def worker(q, q_pile, idx): while True: item = q.get() print("Thread: {0} processed: {1}".format(item[1], idx)) if random.random() &gt; 0.3: print("Thread: {1} reinserting item: {0}".format(item[1], idx)) q_pile.put((-1 * time.time(), item[1])) q.task_done() def schedule(q_pile, q): while True: if not q_pile.empty(): print("Items on pile: {0}".format(q_pile.qsize())) q.put(q_pile.get()) q_pile.task_done() time.sleep(rate) def main(): q_pile = PriorityQueue() q = Queue() for i in range(5): t = Thread(target=worker, args=[q, q_pile, i]) t.daemon = True t.start() t_schedule = Thread(target=schedule, args=[q_pile, q]) t_schedule.daemon = True t_schedule.start() [q_pile.put((-1 * time.time(), i)) for i in range(10)] q_pile.join() q.join() if __name__ == '__main__': main() </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload