Note that there are some explanatory texts on larger screens.

plurals
  1. POpython multiprocessing each with own subprocess (Kubuntu,Mac)
    text
    copied!<p>I've created a script that by default creates one multiprocessing Process; then it works fine. When starting multiple processes, it starts to hang, and not always in the same place. The program's about 700 lines of code, so I'll try to summarise what's going on. I want to make the most of my multi-cores, by parallelising the slowest task, which is aligning DNA sequences. For that I use the subprocess module to call a command-line program: 'hmmsearch', which I can feed in sequences through /dev/stdin, and then I read out the aligned sequences through /dev/stdout. I imagine the hang occurs because of these multiple subprocess instances reading / writing from stdout / stdin, and I really don't know the best way to go about this... I was looking into os.fdopen(...) &amp; os.tmpfile(), to create temporary filehandles or pipes where I can flush the data through. However, I've never used either before &amp; I can't picture how to do that with the subprocess module. Ideally I'd like to bypass using the hard-drive entirely, because pipes are much better with high-throughput data processing! Any help with this would be super wonderful!!</p> <pre><code>import multiprocessing, subprocess from Bio import SeqIO class align_seq( multiprocessing.Process ): def __init__( self, inPipe, outPipe, semaphore, options ): multiprocessing.Process.__init__(self) self.in_pipe = inPipe ## Sequences in self.out_pipe = outPipe ## Alignment out self.options = options.copy() ## Modifiable sub-environment self.sem = semaphore def run(self): inp = self.in_pipe.recv() while inp != 'STOP': seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time. # HMM is a file location. align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) self.sem.acquire() align_process.stdin.write( seq_record.format('fasta') ) align_process.stdin.close() for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer align_process.wait() # Don't know if there's any need for this?? self.sem.release() align_process.stdout.close() inp = self.in_pipe.recv() self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles. self.out_pipe.close() </code></pre> <p>Having spent a while debugging this, I've found a problem that was always there and isn't quite solved yet, but have fixed some other <em>inefficiencies</em> in the process (of debugging). There are two initial feeder functions, this align_seq class and a file parser <strong>parseHMM()</strong> which loads a position specific scoring matrix (PSM) into a dictionary. The main parent process then compares the alignment to the PSM, using a dictionary (of dictionaries) as a pointer to the relevant score for each residue. In order to calculate the scores I want I have two separate multiprocessing.Process classes, one class <strong>logScore()</strong> that calculates the log odds ratio (with math.exp() ); I parallelise this one; and it Queues the calculated scores to the last Process, <strong>sumScore()</strong> which just sums these scores (with math.fsum), returning the sum and all position specific scores back to the parent process as a dictionary. i.e. Queue.put( [sum, { residue position : position specific score , ... } ] ) I find this exceptionally confusing to get my head around (too many queue's!), so I hope that readers are managing to follow... After all the above calculations are done, I then give the option to save the cumulative scores as tab-delimited output. This is where it now (since last night) sometimes breaks, as I ensure it prints out a score for every position where there should be a score. I think that due to latency (computer timings being out-of-sync), sometimes what gets put in the Queue first for <strong>logScore</strong> doesn't reach <strong>sumScore</strong> first. In order that sumScore knows when to return the tally and start again, I put 'endSEQ' into the Queue for the last logScore process that performed a calculation. I thought that then it should reach sumScore last too, but that's not always the case; only sometimes does it break. So now I don't get a deadlock anymore, but instead a KeyError when printing or saving the results. I believe the reason for sometimes getting KeyError is because I create a Queue for each logScore process, but that instead they should all use the same Queue. Now, where I have something like:-</p> <pre><code>class logScore( multiprocessing.Process ): def __init__( self, inQ, outQ ): self.inQ = inQ ... def scoreSequence( processes, HMMPSM, sequenceInPipe ): process_index = -1 sequence = sequenceInPipe.recv_bytes() for residue in sequence: .... ## Get the residue score. process_index += 1 processes[process_index].inQ.put( residue_score ) ## End of sequence processes[process_index].inQ.put( 'endSEQ' ) logScore_to_sumScoreQ = multiprocessing.Queue() logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ] sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut ) </code></pre> <p>whereas I should create just one Queue to share between all the logScore instances. i.e.</p> <pre><code>logScore_to_sumScoreQ = multiprocessing.Queue() scoreSeq_to_logScore = multiprocessing.Queue() logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ] sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut ) </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload