Note that there are some explanatory texts on larger screens.

plurals
  1. PODeveloping Hive UDAF meet a ClassCastException without an idea
    primarykey
    data
    text
    <p>`public class GenericUdafMemberLevel implements GenericUDAFResolver2 { private static final Log LOG = LogFactory .getLog(GenericUdafMemberLevel.class.getName());</p> <pre><code>@Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo) throws SemanticException { return new GenericUdafMeberLevelEvaluator(); } @Override //参数校验 public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { if (parameters.length != 2) {//参数大小 throw new UDFArgumentTypeException(parameters.length - 1, "Exactly two arguments are expected."); } //参数必须是原型,即不能是 if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " is passed."); } if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(1, "Only primitive type arguments are accepted but " + parameters[1].getTypeName() + " is passed."); } return new GenericUdafMeberLevelEvaluator(); } public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator { private PrimitiveObjectInspector inputOI; private PrimitiveObjectInspector inputOI2; private DoubleWritable result; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ inputOI = (PrimitiveObjectInspector) parameters[0]; inputOI2 = (PrimitiveObjectInspector) parameters[1]; result = new DoubleWritable(0); } return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } /** class for storing count value. */ static class SumAgg implements AggregationBuffer { boolean empty; double value; } @Override //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。 //使用buffer对象前,先进行内存的清空——reset public AggregationBuffer getNewAggregationBuffer() throws HiveException { SumAgg buffer = new SumAgg(); reset(buffer); return buffer; } @Override //重置为0 //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。 public void reset(AggregationBuffer agg) throws HiveException { ((SumAgg) agg).value = 0.0; ((SumAgg) agg).empty = true; } private boolean warned = false; //迭代 //map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。 @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { // parameters == null means the input table/split is empty if (parameters == null) { return; } try { double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2); if(flag &gt; 1.0) //参数条件 merge(agg, parameters[0]); //这里将Map之后的操作,放入combiner进行合并 } catch (NumberFormatException e) { if (!warned) { warned = true; LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e)); } } } @Override //combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。 public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { //通过ObejctInspector取每一个字段的数据 double p = PrimitiveObjectInspectorUtils.getDouble(partial, inputOI); ((SumAgg) agg).value += p; } } @Override //reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。 public Object terminatePartial(AggregationBuffer agg) throws HiveException { return terminate(agg); } @Override public Object terminate(AggregationBuffer agg) throws HiveException { result.set(((SumAgg) agg).value); return result; } } </code></pre> <p>}`</p> <p>I have used some chinese to comment the code for understanding the theory. Actually, the idea of the UDAF is like follow: select test_sum(col1,col2) from tbl ; if col2 satisfy some condition, then sum col1's value. Most of the code are copied from the offical avg() udaf function.</p> <p>I met a weried Exception: <code>java.lang.RuntimeException: Hive Runtime Error while closing operators at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:226) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1132) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:193) ... 8 more Caused by: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector.get(WritableLongObjectInspector.java:35) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:323) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:255) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:202) at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:236) at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:474) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:800) at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:1061) at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1113) ... 13 more</code></p> <p>Am I have something wrong with my UDAF?? please kindly point it out. Thanks a lllllllot .</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload