Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<blockquote> <p>(1) How could I modify the decorator so that if the f object it takes is an instance method of a class, then the executor it returns is also an instance method of that class object (so that this business about not being able to pickle does not happen, since I can pickle those instance methods)? </p> </blockquote> <pre><code>&gt;&gt;&gt; myfoo.parSquare &lt;bound method Foo.executor of &lt;__main__.Foo object at 0x101332510&gt;&gt; </code></pre> <p>As you can see parSquare is actually executor which has become an instance method, this is no surprise, since decorators are sort of function wrappers ...</p> <p><a href="https://stackoverflow.com/questions/739654/understanding-python-decorators">How to make a chain of function decorators?</a> probably has the best description of decorators.</p> <blockquote> <p>(2) Is it better to create addiitional _pickle_function and _unpickle_function methods?</p> </blockquote> <p>you don't need to python already support them, as a matter of fact this <code>copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)</code> seems a bit strange since you are using the same algorithm to pickle both types.</p> <p>Now the bigger questions is why are we getting <code>PicklingError: Can't pickle &lt;type 'function'&gt;: attribute lookup __builtin__.function failed</code> the error itself seems some what vague but it looks like it failed to lookup something, our function?<br/> I think whats going on is that the decorator is overriding the function with one that was defined internally in your case <code>parSquare</code> becomes <code>executor</code> but <code>executor</code> is an internal function to <code>parallel</code> as such it isn't importable hence the lookup seems to be failing, this is just a hunch.<br></p> <p>lets try a simpler example.</p> <pre><code>&gt;&gt;&gt; def parallel(function): ... def apply(values): ... from multiprocessing import Pool ... pool = Pool(4) ... result = pool.map(function, values) ... pool.close() ... pool.join() ... return result ... return apply ... &gt;&gt;&gt; @parallel ... def square(value): ... return value**2 ... &gt;&gt;&gt; &gt;&gt;&gt; square([1,2,3,4]) Exception in thread Thread-1: Traceback (most recent call last): File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner self.run() File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run self.__target(*self.__args, **self.__kwargs) File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks put(task) PicklingError: Can't pickle &lt;type 'function'&gt;: attribute lookup __builtin__.function failed </code></pre> <p>pretty much the same error we were getting.<br/> note that the above code is equivalent to:</p> <pre><code>def parallel(function): def apply(values): from multiprocessing import Pool pool = Pool(4) result = pool.map(function, values) pool.close() pool.join() return result return apply def square(value): return value**2 square = parallel(square) </code></pre> <p>which produces the same error, also note that if we don't rename our functions.</p> <pre><code>&gt;&gt;&gt; def parallel(function): ... def apply(values): ... from multiprocessing import Pool ... pool = Pool(4) ... result = pool.map(function, values) ... pool.close() ... pool.join() ... return result ... return apply ... &gt;&gt;&gt; def _square(value): ... return value**2 ... &gt;&gt;&gt; square = parallel(_square) &gt;&gt;&gt; square([1,2,3,4]) [1, 4, 9, 16] &gt;&gt;&gt; </code></pre> <p>it works just fine, I've being looking for a way to control the way decorators work with names, but to no avail, I still want to use them with multiprocessing, so I came up with a somewhat ugly work around:</p> <pre><code>&gt;&gt;&gt; def parallel(function): ... def temp(_): ... def apply(values): ... from multiprocessing import Pool ... pool = Pool(4) ... result = pool.map(function, values) ... pool.close() ... pool.join() ... return result ... return apply ... return temp ... &gt;&gt;&gt; def _square(value): ... return value*value ... &gt;&gt;&gt; @parallel(_square) ... def square(values): ... pass ... &gt;&gt;&gt; square([1,2,3,4]) [1, 4, 9, 16] &gt;&gt;&gt; </code></pre> <p>so basically I passed the real function to the decorator then I used a second function to deal with the values, as you can see it works just fine.</p> <p>I've slightly modify your initial code to better handle the decorator, though it isn't perfect.</p> <pre><code>import types, copy_reg, multiprocessing as mp def parallel(f): def executor(*args): _pool = mp.Pool(2) func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure _result = _pool.map(func, args[1]) _pool.close() _pool.join() return _result return executor def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class cls_name = '' if func_name.startswith('__') and not func_name.endswith('__'): cls_name = cls.__name__.lstrip('_') if cls_name: func_name = '_' + cls_name + func_name return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): func = None for cls in cls.mro(): if func_name in cls.__dict__: func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor break else: for attr in dir(cls): prop = getattr(cls, attr) if hasattr(prop, '__call__') and prop.__name__ == func_name: func = cls.__dict__[attr] break if func == None: raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls))) return func.__get__(obj, cls) copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class Foo(object): def __init__(self, args): self.my_args = args def squareArg(self, arg): return arg**2 def par_squareArg(self): p = mp.Pool(2) # Replace 2 with the number of processors. q = p.map(self.squareArg, self.my_args) p.close() p.join() return q @parallel def parSquare(self, num): return self.squareArg(num) if __name__ == "__main__": myfoo = Foo([1,2,3,4]) print myfoo.par_squareArg() print myfoo.parSquare(myfoo.my_args) </code></pre> <p>fundamentally this still fails, giving us <code>AssertionError: daemonic processes are not allowed to have children</code> since the subprocess tries to call the function, keep in mind that subprocess don't really copy the code simply the names ...</p> <p>one work around is similar to the one I mentioned previously:</p> <pre><code>import types, copy_reg, multiprocessing as mp def parallel(f): def temp(_): def executor(*args): _pool = mp.Pool(2) func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure _result = _pool.map(func, args[1]) _pool.close() _pool.join() return _result return executor return temp def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class cls_name = '' if func_name.startswith('__') and not func_name.endswith('__'): cls_name = cls.__name__.lstrip('_') if cls_name: func_name = '_' + cls_name + func_name return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): func = None for cls in cls.mro(): if func_name in cls.__dict__: func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor break else: for attr in dir(cls): prop = getattr(cls, attr) if hasattr(prop, '__call__') and prop.__name__ == func_name: func = cls.__dict__[attr] break if func == None: raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls))) return func.__get__(obj, cls) copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) class Foo(object): def __init__(self, args): self.my_args = args def squareArg(self, arg): return arg**2 def par_squareArg(self): p = mp.Pool(2) # Replace 2 with the number of processors. q = p.map(self.squareArg, self.my_args) p.close() p.join() return q def _parSquare(self, num): return self.squareArg(num) @parallel(_parSquare) def parSquare(self, num): pass if __name__ == "__main__": myfoo = Foo([1,2,3,4]) print myfoo.par_squareArg() print myfoo.parSquare(myfoo.my_args) [1, 4, 9, 16] [1, 4, 9, 16] </code></pre> <p>One last thing, be very careful when multithreading, depending on how you segment your data you can actually have a slower time multithreaded than single threaded, mainly due to the overhead of copying values back and forth as well creating and destroying subprocess. <br/></p> <p>Always benchmark single/multithreaded and properly segment your data when possible.</p> <p>case in point:</p> <pre><code>import numpy import time from multiprocessing import Pool def square(value): return value*value if __name__ == '__main__': pool = Pool(5) values = range(1000000) start = time.time() _ = pool.map(square, values) pool.close() pool.join() end = time.time() print "multithreaded time %f" % (end - start) start = time.time() _ = map(square, values) end = time.time() print "single threaded time %f" % (end - start) start = time.time() _ = numpy.asarray(values)**2 end = time.time() print "numpy time %f" % (end - start) v = numpy.asarray(values) start = time.time() _ = v**2 end = time.time() print "numpy without pre-initialization %f" % (end - start) </code></pre> <p>gives us:</p> <pre><code>multithreaded time 0.484441 single threaded time 0.196421 numpy time 0.184163 numpy without pre-initialization 0.004490 </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload