Note that there are some explanatory texts on larger screens.

plurals
  1. POhaving issues with BlockingQueue java , implementation of Storm (Distributed computing)?
    text
    copied!<p>This is the code snippt of my input spout for emmiting tuple to a processing noded for stream processing over a cluster. The problem is The BlockingQueue is throwing InterruptedException .</p> <pre><code>private SpoutOutputCollector collector; public BlockingQueue&lt;String&gt; blockingQueue = new LinkedBlockingQueue&lt;String&gt;(); public boolean isDistributed() { return true; } public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { this.collector=collector; } @Override public void nextTuple() { try { //Utils.sleep(100); collector.emit(new Values("Single Temperature Reading", blockingQueue.take())); } catch (InterruptedException e) { e.printStackTrace(); } } public void readInputfile() throws IOException, InterruptedException{ FileInputStream file = new FileInputStream("/home/529076/Desktop/Temperature"); DataInputStream readDate=new DataInputStream(file); BufferedReader readText=new BufferedReader(new InputStreamReader(readDate)); String line; String singleReading = null; while((line=readText.readLine())!=null){ singleReading=line; blockingQueue.add(singleReading); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("Single Temperature Reading")); } </code></pre> <p>The exception description is as followes :---</p> <p>java.lang.InterruptedException10930 [Thread-20] INFO backtype.storm.util - Async loop interrupted!</p> <pre><code>at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at com.tcs.storm.test.InputStreamSpout.nextTuple(InputStreamSpout.java:65) at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:413) </code></pre> <p>And The nextTuple(InputStreamSpout.java:65 is ------></p> <pre><code> collector.emit(new Values("Single Temperature Reading", blockingQueue.take())); </code></pre> <p>Thanks </p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload