Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <h1>1) Intro / Problem</h1> <p>Before going ahead with the job driver, it is important to understand that in a simple-minded approach, the values of the reducers should be <strong>sorted</strong> in an ascending order. The first thought is to pass the value list unsorted and do some sorting in the reducer per key. This has two disadvantages:</p> <p>1) It is most probably <strong>not efficient</strong> for large Value Lists</p> <p>and</p> <p>2) How will the framework know if (1,4) is equal to (4,1) if these pairs are processed in different parts of the cluster?</p> <h1>2) Solution in theory</h1> <p>The way to do it in Hadoop is to "mock" the framework in a way by creating a <strong>synthetic key</strong>.</p> <p>So our map function instead of the "conceptually more appropriate" (if I may say that)</p> <p><code>map(k1, v1) -&gt; list(k2, v2)</code></p> <p>is the following:</p> <p><code>map(k1, v1) -&gt; list(ksynthetic, null)</code></p> <p>As you notice we discard the usage of values (the reducer still gets a list of <code>null</code> values but we don't really care about them). What happens here is that these values are actually <strong>included</strong> in <code>ksynthetic</code>. Here is an example for the problem in question:</p> <p><code>`map(1, 2) -&gt; list([1,2], null)</code></p> <p>However, some more operations need to be done so that the keys are grouped and partitioned appropriately and we achieve the correct result in the reducer.</p> <h1>3) Hadoop Implementation</h1> <p>We will implement a class called <code>FFGroupKeyComparator</code> and a class <code>FindFriendPartitioner</code>.</p> <p>Here is our <code>FFGroupKeyComparator</code>:</p> <pre><code>public static class FFGroupComparator extends WritableComparator { protected FFGroupComparator() { super(Text.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { Text t1 = (Text) w1; Text t2 = (Text) w2; String[] t1Items = t1.toString().split(","); String[] t2Items = t2.toString().split(","); String t1Base = t1Items[0]; String t2Base = t2Items[0]; int comp = t1Base.compareTo(t2Base); // We compare using "real" key part of our synthetic key return comp; } } </code></pre> <p>This class will act as our Grouping Comparator class. It controls which keys are grouped together for a single call to <code>Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)</code> This is very important as it ensures that each reducer gets the appropriate synthetic keys ( judging by the real key).</p> <p>Due to the fact that Hadoop runs in a cluster with many nodes it is important to ensure that there as many reduce tasks as partitions. Their number should be the same as of the <strong>real keys</strong> (not synthetic). So, usually we do this with hash values. In our case, what we need to do is compute the partition that a synthetic key belongs based on the hash value of the real key (before the comma). So our <code>FindFriendPartitioner</code> is as follows:</p> <pre><code>public static class FindFriendPartitioner extends Partitioner implements Configurable { @Override public int getPartition(Text key, Text NullWritable, int numPartitions) { String[] keyItems = key.toString().split(","); String keyBase = keyItems[0]; int part = keyBase.hashCode() % numPartitions; return part; } </code></pre> <p>So now we are all set to write the actual job and solve our problem.</p> <p>I am assuming your input file looks like this:</p> <pre><code>1,2 2,1 1,3 3,2 2,4 4,1 </code></pre> <p>We will use the <code>TextInputFormat</code>.</p> <p>Here's the code for the job driver using Hadoop 1.0.4: </p> <pre><code>public class FindFriendTwo { public static class FindFriendMapper extends Mapper&lt;Object, Text, Text, NullWritable&gt; { public void map(Object, Text value, Context context) throws IOException, InterruptedException { context.write(value, new NullWritable() ); String tempStrings[] = value.toString().split(","); Text value2 = new Text(tempStrings[1] + "," + tempStrings[0]); //reverse relationship context.write(value2, new NullWritable()); } </code></pre> <p>}</p> <p>Notice that we also passed the reverse relationships in the <code>map</code> function.</p> <p>For example if the input string is (1,4) we must not forget (4,1).</p> <pre><code>public static class FindFriendReducer extends Reducer&lt;Text, NullWritable, IntWritable, IntWritable&gt; { private Set&lt;String&gt; friendsSet; public void setup(Context context) { friendSet = new LinkedHashSet&lt;String&gt;(); } public void reduce(Text syntheticKey, Iterable&lt;IntWritable&gt; values, Context context) throws IOException, InterruptedException { String tempKeys[] = syntheticKey.toString().split(","); friendsSet.add(tempKeys[1]); if( friendsList.size() == 2 ) { IntWritable key = Integer.parseInt(tempKeys[0]); IntWritable value = Integer.parseInt(tempKeys[1]); write(key, value); } } } </code></pre> <p>Finally, we must remember to include the following in our Main Class, so that the framework uses our classes.</p> <pre><code>jobConf.setGroupingComparatorClass(FFGroupComparator.class); jobConf.setPartitionerClass(FindFriendPartitioner.class); </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
    3. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload