Note that there are some explanatory texts on larger screens.

plurals
  1. POTrouble speeding up application using Multiprocessing+Threads in Python
    primarykey
    data
    text
    <p>I have CPU bound application that I wish to speedup using multiprocessing+threading instead of using the pure threaded version. I wrote a simple application to check the performance of my approach and was surprised to see that the multiprocessing and multiprocessing+threaded versions were performing poorer than both the threaded and serial versions.</p> <p>In my application I have a work queue that stores all the work. The threads then pop off one work item at a time and then process it either directly (threaded version) or by passing it into a process. The thread then needs to wait for the result to arrive before proceeding with the next iteration. The reason I need to pop off one work item at a time is because the work is dynamic(not the case in the prototype application code pasted below) and I cannot pre-partition the work and hand it off to each thread/process during creation.</p> <p>I would like to know what I am doing wrong and how I could speedup my application. </p> <p>Here is the execution time when I ran on a 16-core machine:</p> <pre><code>Version : 2.7.2 Compiler : GCC 4.1.2 20070925 (Red Hat 4.1.2-33) Platform : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf Processor : x86_64 Num Threads/Processes: 8 ; Num Items: 16000 mainMultiprocessAndThreaded exec time: 3505.97214699 ms mainPureMultiprocessing exec time: 2241.89805984 ms mainPureThreaded exec time: 309.767007828 ms mainSerial exec time: 52.3412227631 ms Terminating </code></pre> <p>and here is the code I used:</p> <pre><code>import threading import multiprocessing import time import platform class ConcurrentQueue: def __init__(self): self.data = [] self.lock = threading.Lock() def push(self, item): self.lock.acquire() try: self.data.append(item) finally: self.lock.release() return def pop(self): self.lock.acquire() result = None try: length = len(self.data) if length &gt; 0: result = self.data.pop() finally: self.lock.release() return result def isEmpty(self, item): self.lock.acquire() result = 0 try: result = len(self.data) finally: self.lock.release() return result != 0 def timeFunc(passedFunc): def wrapperFunc(*arg): startTime = time.time() result = passedFunc(*arg) endTime = time.time() elapsedTime = (endTime - startTime) * 1000 print passedFunc.__name__, 'exec time:', elapsedTime, " ms" return result return wrapperFunc def checkPrime(candidate): # dummy process to do some work for k in xrange(3, candidate, 2): if candidate % k: return False return True def fillQueueWithWork(itemQueue, numItems): for item in xrange(numItems, 2 * numItems): itemQueue.push(item) @timeFunc def mainSerial(numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) while True: dataItem = jobQueue.pop() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) return # Start: Implement a pure threaded version def pureThreadFunc(jobQueue): curThread = threading.currentThread() while True: dataItem = jobQueue.pop() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) return @timeFunc def mainPureThreaded(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] for index in xrange(numThreads): loopName = "Thread-" + str(index) loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, )) loopThread.start() workers.append(loopThread) for worker in workers: worker.join() return # End: Implement a pure threaded version # Start: Implement a pure multiprocessing version def pureMultiprocessingFunc(jobQueue, resultQueue): while True: dataItem = jobQueue.get() if dataItem is None: break # do work with dataItem result = checkPrime(dataItem) resultQueue.put_nowait(result) return @timeFunc def mainPureMultiprocessing(numProcesses, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] queueSize = (numItems/numProcesses) + 10 for index in xrange(numProcesses): jobs = multiprocessing.Queue(queueSize) results = multiprocessing.Queue(queueSize) loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, )) loopProcess.start() workers.append((loopProcess, jobs, results)) processIndex = 0 while True: dataItem = jobQueue.pop() if dataItem is None: break workers[processIndex][1].put_nowait(dataItem) processIndex += 1 if numProcesses == processIndex: processIndex = 0 for worker in workers: worker[1].put_nowait(None) for worker in workers: worker[0].join() return # End: Implement a pure multiprocessing version # Start: Implement a threaded+multiprocessing version def mpFunc(processName, jobQueue, resultQueue): while True: dataItem = jobQueue.get() if dataItem is None: break result = checkPrime(dataItem) resultQueue.put_nowait(result) return def mpThreadFunc(jobQueue): curThread = threading.currentThread() threadName = curThread.getName() jobs = multiprocessing.Queue() results = multiprocessing.Queue() myProcessName = "Process-" + threadName myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, )) myProcess.start() while True: dataItem = jobQueue.pop() # put item to allow process to start jobs.put_nowait(dataItem) # terminate loop if work queue is empty if dataItem is None: break # wait to get result from process result = results.get() # do something with result return @timeFunc def mainMultiprocessAndThreaded(numThreads, numItems): jobQueue = ConcurrentQueue() fillQueueWithWork(jobQueue, numItems) workers = [] for index in xrange(numThreads): loopName = "Thread-" + str(index) loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, )) loopThread.start() workers.append(loopThread) for worker in workers: worker.join() return # End: Implement a threaded+multiprocessing version if __name__ == '__main__': print 'Version :', platform.python_version() print 'Compiler :', platform.python_compiler() print 'Platform :', platform.platform() print 'Processor :', platform.processor() numThreads = 8 numItems = 16000 #200000 print "Num Threads/Processes:", numThreads, "; Num Items:", numItems mainMultiprocessAndThreaded(numThreads, numItems) mainPureMultiprocessing(numThreads, numItems) mainPureThreaded(numThreads, numItems) mainSerial(numItems) print "Terminating" </code></pre> <p>Edit: One of my guesses for the slowness is that the Queue.put() are busy waiting instead of relinquishing the GIL. If so, any suggestions on an alternate data structure I should be using?</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload