Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>There's not enough information to be sure, but the problem is very likely to be that <code>Slave.do_work</code> is raising an unhandled exception. (There are many lines of your code that could do that in various different conditions.)</p> <p>When you do that, the child process will just exit.</p> <p>On POSIX systems… well, the full details are a bit complicated, but in the simple case (what you have here), a child process that exits will stick around as a <code>&lt;defunct&gt;</code> process until it gets reaped (because the parent either <code>wait</code>s on it, or exits). Since your parent code doesn't wait on the children until the queue is finished, that's exactly what happens.</p> <p>So, there's a simple duct-tape fix: </p> <pre><code>def do_work(self): self.log(str(os.getpid())) while True: try: # the rest of your code except Exception as e: self.log("something appropriate {}".format(e)) # you may also want to post a reply back to the parent </code></pre> <p>You might also want to break the massive <code>try</code> up into different ones, so you can distinguish between all the different stages where things could go wrong (especially if some of them mean you need a reply, and some mean you don't).</p> <hr> <p>However, it looks like what you're attempting to do is duplicate exactly the behavior of <code>multiprocessing.Pool</code>, but have missed the bar in a couple places. Which raises the question: why not just use <code>Pool</code> in the first place? You could then simplify/optimize things ever further by using one of the <code>map</code> family methods. For example, your entire <code>Master.run</code> could be reduced to:</p> <pre><code>self.init() pool = multiprocessing.Pool(Master.SLAVE_COUNT, initializer=slave_setup) pool.map(slave_job, tables) pool.join() </code></pre> <p>And this will handle exceptions for you, and allow you to return values/exceptions if you later need that, and let you use the built-in <code>logging</code> library instead of trying to build your own, and so on. And it should only take about a dozens lines of minor code changes to <code>Slave</code>, and then you're done.</p> <hr> <p>If you want to submit new jobs from within jobs, the easiest way to do this is probably with a <a href="http://docs.python.org/3/library/concurrent.futures.html#future-objects" rel="noreferrer"><code>Future</code></a>-based API (which turns things around, making the future result the focus and the pool/executor the dumb thing that provides them, instead of making the pool the focus and the result the dumb thing it gives back), but there are multiple ways to do it with <code>Pool</code> as well. For example, right now, you're not returning anything from each job, so, you can just return a list of <code>tables</code> to execute. Here's a simple example that shows how to do it:</p> <pre><code>import multiprocessing def foo(x): print(x, x**2) return list(range(x)) if __name__ == '__main__': pool = multiprocessing.Pool(2) jobs = [5] while jobs: jobs, oldjobs = [], jobs for job in oldjobs: jobs.extend(pool.apply(foo, [job])) pool.close() pool.join() </code></pre> <p>Obviously you can condense this a bit by replacing the whole loop with, e.g., a list comprehension fed into <code>itertools.chain</code>, and you can make it a lot cleaner-looking by passing "a submitter" object to each job and adding to that instead of returning a list of new jobs, and so on. But I wanted to make it as explicit as possible to show how little there is to it.</p> <hr> <p>At any rate, if you think the explicit queue is easier to understand and manage, go for it. Just look at the source for <a href="http://hg.python.org/cpython/file/65e8ac5f073f/Lib/multiprocessing/pool.py#l66" rel="noreferrer"><code>multiprocessing.worker</code></a> and/or <a href="http://hg.python.org/cpython/file/3.3/Lib/concurrent/futures/process.py#l318" rel="noreferrer"><code>concurrent.futures.ProcessPoolExecutor</code></a> to see what you need to do yourself. It's not that hard, but there are enough things you could get wrong (personally, I always forget at least one edge case when I try to do something like this myself) that it's work looking at code that gets it right.</p> <hr> <p>Alternatively, it seems like the only reason you can't use <a href="http://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor" rel="noreferrer"><code>concurrent.futures.ProcessPoolExecutor</code></a> here is that you need to initialize some per-process state (the <code>boto.s3.key.Key</code>, <code>MySqlWrap</code>, etc.), for what are probably very good caching reasons. (If this involves a web-service query, a database connect, etc., you certainly don't want to do that once per task!) But there are a few different ways around that.</p> <p>But you can subclass <code>ProcessPoolExecutor</code> and override the undocumented function <code>_adjust_process_count</code> (see <a href="http://hg.python.org/cpython/file/3.3/Lib/concurrent/futures/process.py#l376" rel="noreferrer">the source</a> for how simple it is) to pass your setup function, and… that's all you have to do.</p> <p>Or you can mix and match. Wrap the <code>Future</code> from <code>concurrent.futures</code> around the <code>AsyncResult</code> from <code>multiprocessing</code>.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload