Note that there are some explanatory texts on larger screens.

plurals
  1. POJoin with Hadoop in Java
    primarykey
    data
    text
    <p>I'm working since short time with Hadoop and trying to implement a join in Java. It doesn't matter if Map-Side or Reduce-Side. I took Reduce-Side join since it was supposed to be easier to implement. I know that Java is not the best choice for joins, aggregations etc. and should better pick Hive or Pig which I have done already. However I'm working on a research project and I have to use all of those 3 languages in order to deliver a comparison. </p> <p>Anyway, I have two input files with different structure. One is key|value and the other one is key|value1;value2;value3;value4. One record from each input file looks like following:</p> <ul> <li>Input1: <code>1;2010-01-10T00:00:01</code></li> <li>Input2: <code>1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59</code></li> </ul> <p>I followed the example in the Hadoop Definitve Guide book, but it didn't work for me. I'm posting my java files here, so you can see what's wrong.</p> <pre><code>public class LookupReducer extends Reducer&lt;TextPair,Text,Text,Text&gt; { private String result = ""; private String msisdn; private String attribute, product; private long trans_dt_long, start_dt_long, end_dt_long; private String trans_dt, start_dt, end_dt; @Override public void reduce(TextPair key, Iterable&lt;Text&gt; values, Context context) throws IOException, InterruptedException { context.progress(); //value without key to remember Iterator&lt;Text&gt; iter = values.iterator(); for (Text val : values) { Text recordNoKey = val; //new Text(iter.next()); String valSplitted[] = recordNoKey.toString().split(";"); //if the input is coming from CDR set corresponding values if(key.getSecond().toString().equals(CDR.CDR_TAG)) { trans_dt = recordNoKey.toString(); trans_dt_long = dateToLong(recordNoKey.toString()); } //if the input is coming from Attributes set corresponding values else if(key.getSecond().toString().equals(Attribute.ATT_TAG)) { attribute = valSplitted[0]; product = valSplitted[1]; start_dt = valSplitted[2]; start_dt_long = dateToLong(valSplitted[2]); end_dt = valSplitted[3]; end_dt_long = dateToLong(valSplitted[3]);; } Text record = val; //iter.next(); //System.out.println("RECORD: " + record); Text outValue = new Text(recordNoKey.toString() + ";" + record.toString()); if(start_dt_long &lt; trans_dt_long &amp;&amp; trans_dt_long &lt; end_dt_long) { //concat output columns result = attribute + ";" + product + ";" + trans_dt; context.write(key.getFirst(), new Text(result)); System.out.println("KEY: " + key); } } } private static long dateToLong(String date){ DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date parsedDate = null; try { parsedDate = formatter.parse(date); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } long dateInLong = parsedDate.getTime(); return dateInLong; } public static class TextPair implements WritableComparable&lt;TextPair&gt; { private Text first; private Text second; public TextPair(){ set(new Text(), new Text()); } public TextPair(String first, String second){ set(new Text(first), new Text(second)); } public TextPair(Text first, Text second){ set(first, second); } public void set(Text first, Text second){ this.first = first; this.second = second; } public Text getFirst() { return first; } public void setFirst(Text first) { this.first = first; } public Text getSecond() { return second; } public void setSecond(Text second) { this.second = second; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub first.readFields(in); second.readFields(in); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub first.write(out); second.write(out); } @Override public int hashCode(){ return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o){ if(o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) &amp;&amp; second.equals(tp.second); } return false; } @Override public String toString(){ return first + ";" + second; } @Override public int compareTo(TextPair tp) { // TODO Auto-generated method stub int cmp = first.compareTo(tp.first); if(cmp != 0) return cmp; return second.compareTo(tp.second); } public static class FirstComparator extends WritableComparator { protected FirstComparator(){ super(TextPair.class, true); } @Override public int compare(WritableComparable comp1, WritableComparable comp2){ TextPair pair1 = (TextPair) comp1; TextPair pair2 = (TextPair) comp2; int cmp = pair1.getFirst().compareTo(pair2.getFirst()); if(cmp != 0) return cmp; return -pair1.getSecond().compareTo(pair2.getSecond()); } } public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(TextPair.class, true); } @Override public int compare(WritableComparable comp1, WritableComparable comp2) { TextPair pair1 = (TextPair) comp1; TextPair pair2 = (TextPair) comp2; return pair1.compareTo(pair2); } } } } </code></pre> <hr> <pre><code>public class Joiner extends Configured implements Tool { public static final String DATA_SEPERATOR =";"; //Define the symbol for seperating the output data public static final String NUMBER_OF_REDUCER = "1"; //Define the number of the used reducer jobs public static final String COMPRESS_MAP_OUTPUT = "true"; //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false") public static final String USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec"; //set the used codec for the data compression public static final boolean JOB_RUNNING_LOCAL = true; //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true //if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false public static class KeyPartitioner extends Partitioner&lt;TextPair, Text&gt; { @Override public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) { System.out.println("numPartitions: " + numPartitions); return (/*[*/key.getFirst().hashCode()/*]*/ &amp; Integer.MAX_VALUE) % numPartitions; } } private static Configuration hadoopconfig() { Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR); conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT); //conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC); conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER); return conf; } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub if ((args.length != 3) &amp;&amp; (JOB_RUNNING_LOCAL)) { System.err.println("Usage: Lookup &lt;CDR-inputPath&gt; &lt;Attribute-inputPath&gt; &lt;outputPath&gt;"); System.exit(2); } //starting the Hadoop job Configuration conf = hadoopconfig(); Job job = new Job(conf, "Join cdrs and attributes"); job.setJarByClass(Joiner.class); MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class); //FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //expecting a folder instead of a file if(JOB_RUNNING_LOCAL) FileOutputFormat.setOutputPath(job, new Path(args[2])); else FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setReducerClass(LookupReducer.class); job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Joiner(), args); System.exit(exitCode); } } </code></pre> <hr> <pre><code>public class Attribute { public static final String ATT_TAG = "1"; public static class AttributeMapper extends Mapper&lt;LongWritable, Text, TextPair, Text&gt;{ private static Text values = new Text(); //private Object output = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //partition the input line by the separator semicolon String[] attributes = value.toString().split(";"); String valuesInString = ""; if(attributes.length != 5) System.err.println("Input column number not correct. Expected 5, provided " + attributes.length + "\n" + "Check the input file"); if(attributes.length == 5) { //setting the values with the input values read above valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4]; values.set(valuesInString); //writing out the key and value pair context.write( new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values); } } } } public class CDR { public static final String CDR_TAG = "0"; public static class CDRMapper extends Mapper&lt;LongWritable, Text, TextPair, Text&gt;{ private static Text values = new Text(); private Object output = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //partition the input line by the separator semicolon String[] cdr = value.toString().split(";"); //setting the values with the input values read above values.set(cdr[1]); //output = CDR_TAG + cdr[1]; //writing out the key and value pair context.write( new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values); } } } </code></pre> <p>I would be glad if anyone could at least post a link for a tutorial or a simple example where such a join functionality is implemented. I searched a lot, but either the code was not complete or there was not enough explanation.</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload