Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>I created a one-off for myself a couple years ago when I got an 8-core machine, but I wasn't terribly happy with it. I never got it to be as simple to used as I had hoped, and memory-intensive tasks didn't scale well.</p> <p>If you don't get any <em>real</em> answers I can share more, but the core of it is:</p> <pre><code>public class LocalMapReduce&lt;TMapInput, TMapOutput, TOutput&gt; { private int m_threads; private Mapper&lt;TMapInput, TMapOutput&gt; m_mapper; private Reducer&lt;TMapOutput, TOutput&gt; m_reducer; ... public TOutput mapReduce(Iterator&lt;TMapInput&gt; inputIterator) { ExecutorService pool = Executors.newFixedThreadPool(m_threads); Set&lt;Future&lt;TMapOutput&gt;&gt; futureSet = new HashSet&lt;Future&lt;TMapOutput&gt;&gt;(); while (inputIterator.hasNext()) { TMapInput m = inputIterator.next(); Future&lt;TMapOutput&gt; f = pool.submit(m_mapper.makeWorker(m)); futureSet.add(f); Thread.sleep(10); } while (!futureSet.isEmpty()) { Thread.sleep(5); for (Iterator&lt;Future&lt;TMapOutput&gt;&gt; fit = futureSet.iterator(); fit.hasNext();) { Future&lt;TMapOutput&gt; f = fit.next(); if (f.isDone()) { fit.remove(); TMapOutput x = f.get(); m_reducer.reduce(x); } } } return m_reducer.getResult(); } } </code></pre> <p>EDIT: Based on a comment, below is a version without <code>sleep</code>. The trick is to use <code>CompletionService</code> which essentially provides a blocking queue of completed <code>Future</code>s.</p> <pre><code> public class LocalMapReduce&lt;TMapInput, TMapOutput, TOutput&gt; { private int m_threads; private Mapper&lt;TMapInput, TMapOutput&gt; m_mapper; private Reducer&lt;TMapOutput, TOutput&gt; m_reducer; ... public TOutput mapReduce(Collection&lt;TMapInput&gt; input) { ExecutorService pool = Executors.newFixedThreadPool(m_threads); CompletionService&lt;TMapOutput&gt; futurePool = new ExecutorCompletionService&lt;TMapOutput&gt;(pool); Set&lt;Future&lt;TMapOutput&gt;&gt; futureSet = new HashSet&lt;Future&lt;TMapOutput&gt;&gt;(); for (TMapInput m : input) { futureSet.add(futurePool.submit(m_mapper.makeWorker(m))); } pool.shutdown(); int n = futureSet.size(); for (int i = 0; i &lt; n; i++) { m_reducer.reduce(futurePool.take().get()); } return m_reducer.getResult(); } </code></pre> <p>I'll also note this is a very distilled map-reduce algorithm, including a single reduce worker which does both the reduce and merge operation.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload