Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>While I cannot figure a way to improve the design, at least we can pull out the generic component into a utility class. With the threading code pulled out, BulkCalc3 is sufficiently concise.</p> <pre><code>class BulkCalc3 { final Calc calc; public BulkCalc3(Calc calc) { this.calc = calc; } public TreeMap&lt;Integer, Double&gt; calc(Iterator&lt;BigThing&gt; in) { final ConcurrentMap&lt;Integer, Double&gt; resultMap = new MapMaker().makeMap(); ThreadedIteratorProcessor&lt;BigThing&gt; processor = new ThreadedIteratorProcessor&lt;BigThing&gt;(); processor.processIterator(in, new ThreadedIteratorProcessor.ElementProcessor&lt;BigThing&gt;() { @Override public void processElement(BigThing o) { resultMap.put(o.getId(), calc.calc(o)); } }); return new TreeMap&lt;Integer, Double&gt;(resultMap); } } </code></pre> <p>Here's the utility class:</p> <pre><code>import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * A utility class to process each element in an iterator in an efficient manner. */ public class ThreadedIteratorProcessor&lt;T&gt; { public static interface ElementProcessor&lt;T&gt; { /** * Process an element. * @param element The element to process. */ public void processElement(T element); } private final int numThreads; /** * Create an instance which uses a specified number of threads. * @param numThreads The number of processing threads. */ public ThreadedIteratorProcessor(int numThreads) { this.numThreads = numThreads; } /** * Create an instance which uses a number of threads equal to the number of system processors. */ public ThreadedIteratorProcessor() { this(Runtime.getRuntime().availableProcessors()); } /** * Process each element in an iterator in parallel. The number of worker threads depends on how this object was * constructed. This method will re-throw any exception thrown in the supplied ElementProcessor. An element will * not be requested from the iterator any earlier than is absolutely necessary. In other words, the last element in * the iterator will not be consumed until all of the other elements are completely processed, excluding elements * currently being processed by the worker threads. * @param iterator The iterator from which to get elements. This iterator need not be thread-safe. * @param elementProcessor The element processor. */ public void processIterator(Iterator&lt;T&gt; iterator, ElementProcessor&lt;T&gt; elementProcessor) { // Use an ExecutorService for proper exception handling. ExecutorService e = Executors.newFixedThreadPool(numThreads, MoreExecutors.daemonThreadFactory()); List&lt;Future&lt;?&gt;&gt; futures = Lists.newLinkedList(); // Get a thread-safe iterator final SafeIterator&lt;T&gt; safeIterator = new SafeIterator&lt;T&gt;(iterator); // Submit numThreads new worker threads to pull work from the iterator. for (int i = 0; i &lt; numThreads; i++) { futures.add(e.submit(new Consumer&lt;T&gt;(safeIterator, elementProcessor))); } e.shutdown(); // Calling .get() on the futures accomplishes two things: // 1. awaiting completion of the work // 2. discovering an exception during calculation, and rethrowing to the client in this thread. for (Future&lt;?&gt; future : futures) { try { future.get(); } catch (InterruptedException ex) { // swallowing is OK } catch (ExecutionException ex) { // Re-throw the underlying exception to the client. throw Throwables.propagate(ex.getCause()); } } } // A runnable that sits in a loop consuming and processing elements from an iterator. private static class Consumer&lt;T&gt; implements Runnable { private final SafeIterator&lt;T&gt; it; private final ElementProcessor&lt;T&gt; elementProcessor; public Consumer(SafeIterator&lt;T&gt; it, ElementProcessor&lt;T&gt; elementProcessor) { this.it = it; this.elementProcessor = elementProcessor; } @Override public void run() { while (true) { T o = it.nextOrNull(); if (o == null) { return; } elementProcessor.processElement(o); } } } // a thread-safe iterator-like object. private static class SafeIterator&lt;T&gt; { private final Iterator&lt;T&gt; in; SafeIterator(Iterator&lt;T&gt; in) { this.in = in; } synchronized T nextOrNull() { if (in.hasNext()) { return in.next(); } return null; } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload