Note that there are some explanatory texts on larger screens.

plurals
  1. POWhy Hadoop shuffle not working as expected
    primarykey
    data
    text
    <p>I have this hadoop map reduce code that works on graph data (in adjacency list form) and kind of similar to in-adjacency list to out-adjacency list transformation algorithms. The main MapReduce Task code is following:</p> <pre><code>public class TestTask extends Configured implements Tool { public static class TTMapper extends MapReduceBase implements Mapper&lt;Text, TextArrayWritable, Text, NeighborWritable&gt; { @Override public void map(Text key, TextArrayWritable value, OutputCollector&lt;Text, NeighborWritable&gt; output, Reporter reporter) throws IOException { int numNeighbors = value.get().length; double weight = (double)1 / numNeighbors; Text[] neighbors = (Text[]) value.toArray(); NeighborWritable me = new NeighborWritable(key, new DoubleWritable(weight)); for (int i = 0; i &lt; neighbors.length; i++) { output.collect(neighbors[i], me); } } } public static class TTReducer extends MapReduceBase implements Reducer&lt;Text, NeighborWritable, Text, Text&gt; { @Override public void reduce(Text key, Iterator&lt;NeighborWritable&gt; values, OutputCollector&lt;Text, Text&gt; output, Reporter arg3) throws IOException { ArrayList&lt;NeighborWritable&gt; neighborList = new ArrayList&lt;NeighborWritable&gt;(); while(values.hasNext()) { neighborList.add(values.next()); } NeighborArrayWritable neighbors = new NeighborArrayWritable (neighborList.toArray(new NeighborWritable[0])); Text out = new Text(neighbors.toString()); output.collect(key, out); } } @Override public int run(String[] arg0) throws Exception { JobConf conf = Util.getMapRedJobConf("testJob", SequenceFileInputFormat.class, TTMapper.class, Text.class, NeighborWritable.class, 1, TTReducer.class, Text.class, Text.class, TextOutputFormat.class, "test/in", "test/out"); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new TestTask(), args); System.exit(res); } } </code></pre> <p>The auxiliary code is following: TextArrayWritable:</p> <pre><code>public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } public TextArrayWritable(Text[] values) { super(Text.class, values); } } </code></pre> <p>NeighborWritable:</p> <pre><code>public class NeighborWritable implements Writable { private Text nodeId; private DoubleWritable weight; public NeighborWritable(Text nodeId, DoubleWritable weight) { this.nodeId = nodeId; this.weight = weight; } public NeighborWritable () { } public Text getNodeId() { return nodeId; } public DoubleWritable getWeight() { return weight; } public void setNodeId(Text nodeId) { this.nodeId = nodeId; } public void setWeight(DoubleWritable weight) { this.weight = weight; } @Override public void readFields(DataInput in) throws IOException { nodeId = new Text(); nodeId.readFields(in); weight = new DoubleWritable(); weight.readFields(in); } @Override public void write(DataOutput out) throws IOException { nodeId.write(out); weight.write(out); } public String toString() { return "NW[nodeId=" + (nodeId != null ? nodeId.toString() : "(null)") + ",weight=" + (weight != null ? weight.toString() : "(null)") + "]"; } public boolean equals(Object o) { if (!(o instanceof NeighborWritable)) { return false; } NeighborWritable that = (NeighborWritable)o; return (nodeId.equals(that.getNodeId()) &amp;&amp; (weight.equals(that.getWeight()))); } } </code></pre> <p>and the Util class:</p> <pre><code>public class Util { public static JobConf getMapRedJobConf(String jobName, Class&lt;? extends InputFormat&gt; inputFormatClass, Class&lt;? extends Mapper&gt; mapperClass, Class&lt;?&gt; mapOutputKeyClass, Class&lt;?&gt; mapOutputValueClass, int numReducer, Class&lt;? extends Reducer&gt; reducerClass, Class&lt;?&gt; outputKeyClass, Class&lt;?&gt; outputValueClass, Class&lt;? extends OutputFormat&gt; outputFormatClass, String inputDir, String outputDir) throws IOException { JobConf conf = new JobConf(); if (jobName != null) conf.setJobName(jobName); conf.setInputFormat(inputFormatClass); conf.setMapperClass(mapperClass); if (numReducer == 0) { conf.setNumReduceTasks(0); conf.setOutputKeyClass(outputKeyClass); conf.setOutputValueClass(outputValueClass); conf.setOutputFormat(outputFormatClass); } else { // may set actual number of reducers // conf.setNumReduceTasks(numReducer); conf.setMapOutputKeyClass(mapOutputKeyClass); conf.setMapOutputValueClass(mapOutputValueClass); conf.setReducerClass(reducerClass); conf.setOutputKeyClass(outputKeyClass); conf.setOutputValueClass(outputValueClass); conf.setOutputFormat(outputFormatClass); } // delete the existing target output folder FileSystem fs = FileSystem.get(conf); fs.delete(new Path(outputDir), true); // specify input and output DIRECTORIES (not files) FileInputFormat.addInputPath(conf, new Path(inputDir)); FileOutputFormat.setOutputPath(conf, new Path(outputDir)); return conf; } } </code></pre> <p>My input is following graph: (in binary format, here I am giving the text format)</p> <pre><code>1 2 2 1,3,5 3 2,4 4 3,5 5 2,4 </code></pre> <p>According to the logic of the code the output should be:</p> <pre><code>1 NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}] 2 NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],NW[nodeId=1,weight=1.0],}] 3 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}] 4 NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],}] 5 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}] </code></pre> <p>But the output is coming as:</p> <pre><code>1 NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}] 2 NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}] 3 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}] 4 NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}] 5 NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}] </code></pre> <p>I cannot understand the reason why the expected output is not coming out. Any help will be appreciated.</p> <p>Thanks.</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload