Note that there are some explanatory texts on larger screens.

plurals
  1. POBroken python pipeline in Hadoop streaming
    primarykey
    data
    text
    <p>I have a large scale log processing problem that I have to run on a hadoop cluster. The task is to feed each line of the log into a executable "cmd" and check the result to decide whether to keep this line of log or not.</p> <p>Since the "cmd" program opens a very large dictionary, I cannot afford to call the program for everyline of the log. I want to keep it running and feed the required input to it. My current solution use subprocess module of python and here is the code:</p> <pre><code>import sys from subprocess import Popen, PIPE def main(): pp = Popen('./bqc/bqc/bqc_tool ./bqc/bqc/bqc_dict/ ./bqc/bqc/word_dict/ flag', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) for line in sys.stdin: lstr = line.strip().split('\t') if len(lstr) != 7: continue pp.stdin.write('%s\n' % lstr[5]) pp.stdin.flush() out = pp.stdout.readline() lout = out.strip().split('\t') if len(lout) == 3 and lout[1] == '401': print line.strip() if __name__ == '__main__': main() </code></pre> <p>The above code works find when tested from my local machine. It is used as mapper when submitting the job to hadoop. I use no reducer and the following is the configuration.</p> <pre><code>hadoop streaming \ -input /path_to_input \ -output /path_to_output \ -mapper "python/python2.7/bin/python27.sh ./mapper.py" \ -cacheArchive /path_to_python/python272.tar.gz#python \ -cacheArchive /path_to_cmd/bqc.tar.gz#bqc \ -file ./mapper.py \ -jobconf mapred.job.name="JobName" \ -jobconf mapred.job.priority=HIGH </code></pre> <p>The files in bqc.tar.gz looks like this:</p> <pre><code>bqc/ bqc/bqc_tool bqc/bqc_dict/ bqc/word_dict/ </code></pre> <p>In my opinion, the line "-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \" should extract the tar file and extract them in a folder called bqc.</p> <p>But it fails when submitted to a hadoop cluster with the following error message:</p> <pre><code> Traceback (most recent call last): File "./mapper.py", line 19, in main() File "./mapper.py", line 11, in main pp.stdin.write('%s\n' % lstr[5]) IOError: [Errno 32] Broken pipe java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590) at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:152) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388) at org.apache.hadoop.mapred.Child.main(Child.java:194) java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:163) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388) at org.apache.hadoop.mapred.Child.main(Child.java:194) </code></pre> <p>Anyone get an idea? Any help would be appreciated!</p> <p>Thanks!</p> <p>Zachary</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload