Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p><code>concurrent.futures</code> has a minimalistic API. It's easy to use for very straightforward problems, but you don't have a very straightforward problem. If you did, you would already have solved it ;-)</p> <p>You didn't show any of the <code>multiprocessing.Pool</code> code you wrote, but that would be a more promising place to start - assuming you want to solve the problem more than you want to confirm your hope that it must be easy to do if you only you switched to a weaker API ;-)</p> <p>"An obvious" way to proceed using <code>multiprocessing</code> is to use the <code>Pool.apply_async()</code> method, put the async result objects on a bounded <code>Queue.Queue</code>, and have threads in your main program pull those off the <code>Queue</code> and wait for the results to show up. This is easy enough, but it's not magic. It solves your problem because bounded <code>Queues</code> are <strong>the</strong> canonical way to mediate between producers and consumers that run at different speeds. Nothing in <code>concurrent.futures</code> addresses <em>that</em> problem directly, and it's at the heart of your "massive amounts of memory" problem.</p> <pre><code># Define global result_queue only in the main program. import Queue result_queue = Queue.Queue(100) # pick a reasonable max size based on your problem # Run this in as many threads as you like. def consume_results(): while True: a = result_queue.get() if a is None: break output(a.get()) # `output()` is your function ... # main program passes out work, after starting threads for i in range(1000): # the .put() will block so long as the queue is at its max size result_queue.put(pool.apply_async(calculate, args=(i,))) # add sentinels to let threads know they're done for i in range(number_of_threads_you_started): result_queue.put(None) </code></pre> <p>That's the kind of thing you <em>need</em> to keep producers and consumers roughly in balance, and there's nothing in any standard library that will do it for you by magic.</p> <p><strong>EDIT - fleshing it out</strong></p> <p>Here's a complete, executable example anyone with Python3 can run. Notes:</p> <ul> <li>It doesn't use your code fragments, because those rely on an external database module not everyone can run.</li> <li>It sticks to <code>concurrent.futures</code> to manage both processes and threads. It's not really harder to use <code>multiprocessing</code> and <code>threading</code> instead, and indeed the <em>way</em> threads are used here it would be a little easier using <code>threading</code> directly. But this way is clear enough.</li> <li>A <code>concurrent.futures</code> <code>Future</code> object is basically the same thing as a <code>multiprocessing</code> async result object - the API functionalities are just spelled differently.</li> <li>Your problem is not straightforward, because it has multiple stages that can run at different speeds. Again, nothing in any standard library can hide the potentially bad consequences of that by magic. Creating your own bounded queue remains the best solution to that. Memory use here will remain modest for any sane value of <code>MAX_QUEUE_SIZE</code>.</li> <li>You <em>generally</em> don't want to create more CPU-bound worker processes than one less than the number of cores you have available to use. The main program also needs cycles to run, and so does the OS.</li> <li>Once you're used to this stuff, all the comments in this code would be annoying, like seeing the comment "increment by 1" on the code line <code>i += 1</code> ;-)</li> </ul> <p>Here's the code:</p> <pre><code>import concurrent.futures as cf import threading import queue NUM_CPUS = 3 NUM_THREADS = 4 MAX_QUEUE_SIZE = 20 # Runs in worker processes. def producer(i): return i + 10 def consumer(i): global total # We need to protect this with a lock because # multiple threads in the main program can # execute this function simultaneously. with sumlock: total += i # Runs in threads in main program. def consume_results(q): while True: future = q.get() if future is None: break else: consumer(future.result()) if __name__ == "__main__": sumlock = threading.Lock() result_queue = queue.Queue(MAX_QUEUE_SIZE) total = 0 NUM_TO_DO = 1000 with cf.ThreadPoolExecutor(NUM_THREADS) as tp: # start the threads running `consume_results` for _ in range(NUM_THREADS): tp.submit(consume_results, result_queue) # start the worker processes with cf.ProcessPoolExecutor(NUM_CPUS) as pp: for i in range(NUM_TO_DO): # blocks until the queue size &lt;= MAX_QUEUE_SIZE result_queue.put(pp.submit(producer, i)) # tell threads we're done for _ in range(NUM_THREADS): result_queue.put(None) print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2) </code></pre> <p>If all is well, this is the expected output:</p> <pre><code>got 509500 expected 509500 </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload