Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>I created a one-off for myself a couple years ago when I got an 8-core machine, but I wasn't terribly happy with it. I never got it to be as simple to used as I had hoped, and memory-intensive tasks didn't scale well.</p> <p>If you don't get any <em>real</em> answers I can share more, but the core of it is:</p> <pre><code>public class LocalMapReduce&lt;TMapInput, TMapOutput, TOutput&gt; { private int m_threads; private Mapper&lt;TMapInput, TMapOutput&gt; m_mapper; private Reducer&lt;TMapOutput, TOutput&gt; m_reducer; ... public TOutput mapReduce(Iterator&lt;TMapInput&gt; inputIterator) { ExecutorService pool = Executors.newFixedThreadPool(m_threads); Set&lt;Future&lt;TMapOutput&gt;&gt; futureSet = new HashSet&lt;Future&lt;TMapOutput&gt;&gt;(); while (inputIterator.hasNext()) { TMapInput m = inputIterator.next(); Future&lt;TMapOutput&gt; f = pool.submit(m_mapper.makeWorker(m)); futureSet.add(f); Thread.sleep(10); } while (!futureSet.isEmpty()) { Thread.sleep(5); for (Iterator&lt;Future&lt;TMapOutput&gt;&gt; fit = futureSet.iterator(); fit.hasNext();) { Future&lt;TMapOutput&gt; f = fit.next(); if (f.isDone()) { fit.remove(); TMapOutput x = f.get(); m_reducer.reduce(x); } } } return m_reducer.getResult(); } } </code></pre> <p>EDIT: Based on a comment, below is a version without <code>sleep</code>. The trick is to use <code>CompletionService</code> which essentially provides a blocking queue of completed <code>Future</code>s.</p> <pre><code> public class LocalMapReduce&lt;TMapInput, TMapOutput, TOutput&gt; { private int m_threads; private Mapper&lt;TMapInput, TMapOutput&gt; m_mapper; private Reducer&lt;TMapOutput, TOutput&gt; m_reducer; ... public TOutput mapReduce(Collection&lt;TMapInput&gt; input) { ExecutorService pool = Executors.newFixedThreadPool(m_threads); CompletionService&lt;TMapOutput&gt; futurePool = new ExecutorCompletionService&lt;TMapOutput&gt;(pool); Set&lt;Future&lt;TMapOutput&gt;&gt; futureSet = new HashSet&lt;Future&lt;TMapOutput&gt;&gt;(); for (TMapInput m : input) { futureSet.add(futurePool.submit(m_mapper.makeWorker(m))); } pool.shutdown(); int n = futureSet.size(); for (int i = 0; i &lt; n; i++) { m_reducer.reduce(futurePool.take().get()); } return m_reducer.getResult(); } </code></pre> <p>I'll also note this is a very distilled map-reduce algorithm, including a single reduce worker which does both the reduce and merge operation.</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
    3. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload