Note that there are some explanatory texts on larger screens.

plurals
  1. POClassCast Error while writing to Cassandra from hadoop job
    primarykey
    data
    text
    <p>I am running a hadoop job and trying to write the output to Cassandra. I am getting following exception:</p> <pre><code>java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to java.nio.ByteBuffer at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter.write(ColumnFamilyRecordWriter.java:60) at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:514) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264) </code></pre> <p>I modeled my map reduce code on the WordCount example given at <a href="https://wso2.org/repos/wso2/trunk/carbon/dependencies/cassandra/contrib/word_count/src/WordCount.java" rel="nofollow">https://wso2.org/repos/wso2/trunk/carbon/dependencies/cassandra/contrib/word_count/src/WordCount.java</a></p> <p>Here's my MR code:</p> <pre><code>public class SentimentAnalysis extends Configured implements Tool { static final String KEYSPACE = "Travel"; static final String OUTPUT_COLUMN_FAMILY = "Keyword_PtitleId"; public static class Map extends Mapper&lt;LongWritable, Text, Text, LongWritable&gt; { private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Sentiment sentiment = null; try { sentiment = (Sentiment) PojoMapper.fromJson(line, Sentiment.class); } catch(Exception e) { return; } if(sentiment != null &amp;&amp; sentiment.isLike()) { word.set(sentiment.getNormKeyword()); context.write(word, new LongWritable(sentiment.getPtitleId())); } } } public static class Reduce extends Reducer&lt;Text, LongWritable, ByteBuffer, List&lt;Mutation&gt;&gt; { private ByteBuffer outputKey; public void reduce(Text key, Iterator&lt;LongWritable&gt; values, Context context) throws IOException, InterruptedException { List&lt;Long&gt; ptitles = new ArrayList&lt;Long&gt;(); java.util.Map&lt;Long, Integer&gt; ptitleToFrequency = new HashMap&lt;Long, Integer&gt;(); while (values.hasNext()) { Long value = values.next().get(); ptitles.add(value); } for(Long ptitle : ptitles) { if(ptitleToFrequency.containsKey(ptitle)) { ptitleToFrequency.put(ptitle, ptitleToFrequency.get(ptitle) + 1); } else { ptitleToFrequency.put(ptitle, 1); } } byte[] keyBytes = key.getBytes(); outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length)); for(Long ptitle : ptitleToFrequency.keySet()) { context.write(outputKey, Collections.singletonList(getMutation(new Text(ptitle.toString()), ptitleToFrequency.get(ptitle)))); } } private static Mutation getMutation(Text word, int sum) { Column c = new Column(); byte[] wordBytes = word.getBytes(); c.name = ByteBuffer.wrap(Arrays.copyOf(wordBytes, wordBytes.length)); c.value = ByteBuffer.wrap(String.valueOf(sum).getBytes()); c.timestamp = System.currentTimeMillis() * 1000; Mutation m = new Mutation(); m.column_or_supercolumn = new ColumnOrSuperColumn(); m.column_or_supercolumn.column = c; return m; } } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new SentimentAnalysis(), args); System.exit(ret); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "SentimentAnalysis"); job.setJarByClass(SentimentAnalysis.class); String inputFile = args[0]; job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(ByteBuffer.class); job.setOutputValueClass(List.class); job.setOutputFormatClass(ColumnFamilyOutputFormat.class); job.setInputFormatClass(TextInputFormat.class); ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); FileInputFormat.setInputPaths(job, inputFile); ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } </code></pre> <p>}</p> <p>If you look under the Reduce class, I am converting Text field (key) to ByteBuffer properly. </p> <p>Would appreciate some pointers on how to fix this.</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload