Note that there are some explanatory texts on larger screens.

plurals
  1. POIs it possible to manually lock/unlock a Queue?
    primarykey
    data
    text
    <p>I'm curious if there is a way to lock a <code>multiprocessing.Queue</code> object manually. </p> <p>I have a pretty standard Producer/Consumer pattern set up in which my main thread is constantly producing a series of values, and a pool of <code>multiprocessing.Process</code> workers is acting on the values produced. </p> <p>It is all controlled via a sole <code>multiprocessing.Queue()</code>. </p> <pre><code>import time import multiprocessing class Reader(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue def run(self): while True: item = self.queue.get() if isinstance(item, str): break if __name__ == '__main__': queue = multiprocessing.Queue() reader = Reader(queue) reader.start() start_time = time.time() while time.time() - start_time &lt; 10: queue.put(1) queue.put('bla bla bla sentinal') queue.join() </code></pre> <p>The issue I'm running into is that my worker pool cannot consume and process the <code>queue</code> as fast as the main thread insert values into it. So after some period of time, the Queue is so unwieldy that it pops a MemoryError. </p> <p>An obvious solution would be to simply add a wait check in the producer to stall it from putting any more values into the queue. Something along the lines of:</p> <pre><code>while time.time() - start_time &lt; 10: queue.put(1) while queue.qsize() &gt; some_size: time.sleep(.1) queue.put('bla bla bla sentinal') queue.join() </code></pre> <p>However, because of the funky nature of the program, I'd like to dump everything in the Queue to a file for later processing. But! Without being able to temporarily lock the queue, the worker can't consume everything in it as the producer is constantly filling it back up with junk -- conceptually anyway. After numerous tests it seems that at some point one of the locks wins (but usually the one adding to the queue). </p> <p>Edit: Also, I realize it'd be possible to simply stop the producer and consume it from that thread... but that makes the Single Responsibility guy in me feel sad, as the producer is a Producer, not a Consumer. </p> <h3>Edit:</h3> <p>After looking through the source of <code>Queue</code>, I came up with this: </p> <pre><code>def dump_queue(q): q._rlock.acquire() try: res = [] while not q.empty(): res.append(q._recv()) q._sem.release() return res finally: q._rlock.release() </code></pre> <p>However, I'm too scared to use it! I have no idea if this is "correct" or not. I don't have a firm enough grasp to know if this'll hold up without blowing up any of <code>Queue</code>s internals. </p> <p>Anyone know if this'll break? :)</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload