Note that there are some explanatory texts on larger screens.

plurals
  1. POmultiprocessing.Manager() issues in Python
    primarykey
    data
    text
    <p>My apologies for the code dump below, but I figured I'd err on the side of too much context as opposed to too little. </p> <p>I'm attempting to write an async decorator that uses multiprocessing.Manager() for shared memory. As a test case, I'm twice calling a function that adds 1 to a variable of a manager.Namespace() instance. After both calls, I check the value of that variable.</p> <p>80% of the time I see what I would expect: a value of 2. About 20% of the time, the value of the variable is only 1, and I'm at a loss to what could be causing this problem. Any ideas? </p> <pre><code>from multiprocessing import Queue, Process, Manager from collections import defaultdict def queue_function(fn, args, kwargs): async.q.put([fn(*args, **kwargs), id(fn)]) def start_process(fn, args, kwargs): p = Process(target=queue_function, args=(fn, args, kwargs)) p.start() async.processes.append(p) def cleanup(): # ensure no processes remain in a zombie state while async.processes: p = async.processes.pop() p.join() def merge_dicts(d1, d2): for key in ['args', 'kwargs']: d1[key] += d2.get(key, []) return d1 class async: tree = {} q = Queue() processes = [] map = {} manager = Manager() fn_map = {} def __init__(self, callback=False, dependencies=set()): self.callback = callback self.dependencies = dependencies def __call__(self, fn): """Returns decorated function""" def async_fn(*args, **kwargs): fn_call = {'args': [args], 'kwargs': [kwargs]} async.tree[fn] = merge_dicts(fn_call, async.tree.get(fn, {})) # functions cannot be added to queue # work around this by passing an id inst async.fn_map[id(fn)] = fn #mapping from decorated function to undecorated function async.map[async_fn] = fn return async_fn @classmethod def begin(self): # applies fn(*args) for each obj in object, ensuring # that the proper attributes of shared_data exist before calling a method # because some functions depend on the results of other functions, this is # a semi-synchronous operation -- certain methods must be guaranteed to # terminate before others # aliasing tree, q, processes = async.tree, async.q, async.processes fn_map = async.fn_map # start a new process for each object that has no dependencies for fn, v in tree.items(): for i in range(len(v['args'])): args, kwargs = v['args'].pop(), v['kwargs'].pop() start_process(fn, args, kwargs) # read from queue as items are added i = 0 while i &lt; len(processes): # update note with new data result, fn_id = async.q.get() fn = fn_map[fn_id] i += 1 cleanup() if __name__ == '__main__': @async() def add(x): shared.sum += x return shared.sum d = defaultdict(int) for i in range(100): #shared data shared = async.manager.Namespace() shared.sum = 0 add(1) add(1) async.begin() d[shared.sum] += 1 print d </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload