Note that there are some explanatory texts on larger screens.

plurals
  1. POHow to prevent hadoop stream from closing?
    text
    copied!<p>I built a basic web parser that uses hadoop to hand of urls to multiple threads. This works pretty well until I reach the end of my input file, Hadoop declares itself done while there are still threads running. This results in the error org.apache.hadoop.fs.FSError: java.io.IOException: Stream Closed. Is there anyway to keep the stream open long enough for the threads to finish up? (I can with reasonable accuracy predict the maximum amount of time the thread will spend on a single url).</p> <p>Heres how I execute the threads</p> <pre><code>public static class Map extends MapReduceBase implements Mapper&lt;LongWritable, Text, Text, Text&gt; { private Text word = new Text(); private URLPile pile = new URLPile(); private MSLiteThread[] Threads = new MSLiteThread[16]; private boolean once = true; @Override public void map(LongWritable key, Text value, OutputCollector&lt;Text, Text&gt; output, Reporter reporter) { String url = value.toString(); StringTokenizer urls = new StringTokenizer(url); Config.LoggerProvider = LoggerProvider.DISABLED; System.out.println("In Mapper"); if (once) { for (MSLiteThread thread : Threads) { System.out.println("created thread"); thread = new MSLiteThread(pile); thread.start(); } once = false; } while (urls.hasMoreTokens()) { try { word.set(urls.nextToken()); String currenturl = word.toString(); pile.addUrl(currenturl, output); } catch (Exception e) { e.printStackTrace(); continue; } } } </code></pre> <p>The threads themselves get the urls like this</p> <pre><code> public void run(){ try { sleep(3000); while(!done()){ try { System.out.println("in thread"); MSLiteURL tempURL = pile.getNextURL(); String currenturl = tempURL.getURL(); urlParser.parse(currenturl); urlText.set(""); titleText.set(currenturl+urlParser.export()); System.out.println(urlText.toString()+titleText.toString()); tempURL.getOutput().collect(urlText, titleText); pile.doneParsing(); sleep(30); } catch (Exception e) { pile.doneParsing(); e.printStackTrace(); continue; } } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Thread done"); } </code></pre> <p>And the relevant methods in urlpile are</p> <pre><code>public synchronized void addUrl(String url,OutputCollector&lt;Text, Text&gt; output) throws InterruptedException { while(queue.size()&gt;16){ System.out.println("queue full"); wait(); } finishedParcing--; queue.add(new MSLiteURL(output,url)); notifyAll(); } private Queue&lt;MSLiteURL&gt; queue = new LinkedList&lt;MSLiteURL&gt;(); private int sent = 0; private int finishedParcing = 0; public synchronized MSLiteURL getNextURL() throws InterruptedException { notifyAll(); sent++; //System.out.println(queue.peek()); return queue.remove(); } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload