Note that there are some explanatory texts on larger screens.

plurals
  1. POUDF bag of tuples causes error "Long cannot be cast to Tuple"
    primarykey
    data
    text
    <p>I have a Java UDF that takes tuples and returns a bag of tuples. When I operate on that bag (see code below) I get the error message</p> <blockquote> <p>2013-12-18 14:32:33,943 [main] ERROR org.apache.pig.tools.pigstats.PigStats - ERROR: java.lang.Long cannot be cast to org.apache.pig.data.Tuple</p> </blockquote> <p>I cannot recreate this error just by reading in data, grouping and flattening, it only happens with the bag-of-tuples returned by the UDF, even when the <code>DESCRIBE</code>-ed data looks identical to the result of group/flatten/etc.</p> <p>UPDATE: Here is actual code that reproduces the error. (A thousand thanks to anyone who takes the time to read through it.)</p> <pre><code>REGISTER test.jar; A = LOAD 'test-input.txt' using PigStorage(',') AS (id:long, time:long, lat:double, lon:double, alt:double); A_grouped = GROUP A BY (id); U_out = FOREACH A_grouped GENERATE FLATTEN( test.Test(A) ); DESCRIBE U_out; V = FOREACH U_out GENERATE output_tuple.id, output_tuple.time; DESCRIBE V; rmf test.out STORE V INTO 'test.out' using PigStorage(','); </code></pre> <p>file 'test-input.txt':</p> <pre><code>0,1000,33,-100,5000 0,1010,33,-101,6000 0,1020,33,-102,7000 0,1030,33,-103,8000 1,1100,34,-100,15000 1,1110,34,-101,16000 1,1120,34,-102,17000 1,1130,34,-103,18000 </code></pre> <p>The output:</p> <pre><code>$ pig -x local test.pig 2013-12-18 16:47:50,467 [main] INFO org.apache.pig.Main - Logging error messages to: /home/jsnider/pig_1387403270431.log 2013-12-18 16:47:50,751 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:/// U_out: {bag_of_tuples::output_tuple: (id: long,time: long,lat: double,lon: double,alt: double)} V: {id: long,time: long} 2013-12-18 16:47:51,532 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY 2013-12-18 16:47:51,532 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used. 2013-12-18 16:47:51,907 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: V: Store(file:///home/jsnider/test.out:PigStorage(',')) - scope-32 Operator Key: scope-32) 2013-12-18 16:47:51,929 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false 2013-12-18 16:47:51,988 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1 2013-12-18 16:47:51,988 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1 2013-12-18 16:47:51,996 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.AccumulatorOptimizer - Reducer is to run in accumulative mode. 2013-12-18 16:47:52,139 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId= 2013-12-18 16:47:52,158 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job 2013-12-18 16:47:52,199 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3 2013-12-18 16:47:54,225 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job 2013-12-18 16:47:54,249 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=164 2013-12-18 16:47:54,249 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1 2013-12-18 16:47:54,299 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 2013-12-18 16:47:54,299 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission. 2013-12-18 16:47:54,308 [Thread-1] INFO org.apache.hadoop.util.NativeCodeLoader - Loaded the native-hadoop library 2013-12-18 16:47:54,601 [Thread-1] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2013-12-18 16:47:54,601 [Thread-1] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 2013-12-18 16:47:54,627 [Thread-1] WARN org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library is available 2013-12-18 16:47:54,627 [Thread-1] INFO org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library loaded 2013-12-18 16:47:54,633 [Thread-1] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1 2013-12-18 16:47:54,801 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete 2013-12-18 16:47:54,965 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.system.dir; Ignoring. 2013-12-18 16:47:54,966 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.trash.interval; Ignoring. 2013-12-18 16:47:54,966 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.retain.hours; Ignoring. 2013-12-18 16:47:54,968 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.limit.kb; Ignoring. 2013-12-18 16:47:54,970 [Thread-1] WARN org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.temp.dir; Ignoring. 2013-12-18 16:47:54,991 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks 2013-12-18 16:47:54,994 [pool-1-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local_0001_m_000000_0 2013-12-18 16:47:55,047 [pool-1-thread-1] INFO org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0 2013-12-18 16:47:55,053 [pool-1-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@ffeef1 2013-12-18 16:47:55,058 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - Processing split: Number of splits :1 Total Length = 164 Input split[0]: Length = 164 Locations: ----------------------- 2013-12-18 16:47:55,068 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100 2013-12-18 16:47:55,118 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720 2013-12-18 16:47:55,118 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680 2013-12-18 16:47:55,152 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output 2013-12-18 16:47:55,164 [pool-1-thread-1] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0 2013-12-18 16:47:55,167 [pool-1-thread-1] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 2013-12-18 16:47:55,170 [pool-1-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - 2013-12-18 16:47:55,171 [pool-1-thread-1] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done. 2013-12-18 16:47:55,171 [pool-1-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Finishing task: attempt_local_0001_m_000000_0 2013-12-18 16:47:55,172 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete. 2013-12-18 16:47:55,192 [Thread-2] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@38650646 2013-12-18 16:47:55,192 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner - 2013-12-18 16:47:55,196 [Thread-2] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments 2013-12-18 16:47:55,201 [Thread-2] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 418 bytes 2013-12-18 16:47:55,201 [Thread-2] INFO org.apache.hadoop.mapred.LocalJobRunner - 2013-12-18 16:47:55,257 [Thread-2] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local_0001 java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.pig.data.Tuple at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:408) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:138) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:312) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:360) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.runPipeline(PigMapReduce.java:434) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:402) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:382) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:251) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:392) 2013-12-18 16:47:55,477 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001 2013-12-18 16:47:59,995 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - job job_local_0001 has failed! Stop running all dependent jobs 2013-12-18 16:48:00,008 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete 2013-12-18 16:48:00,010 [main] ERROR org.apache.pig.tools.pigstats.PigStatsUtil - 1 map reduce job(s) failed! 2013-12-18 16:48:00,011 [main] INFO org.apache.pig.tools.pigstats.PigStats - Detected Local mode. Stats reported below may be incomplete 2013-12-18 16:48:00,015 [main] INFO org.apache.pig.tools.pigstats.PigStats - Script Statistics: HadoopVersion PigVersion UserId StartedAt FinishedAt Features 0.20.2-cdh3u6 0.8.1-cdh3u6 jsnider 2013-12-18 16:47:52 2013-12-18 16:48:00 GROUP_BY Failed! Failed Jobs: JobId Alias Feature Message Outputs job_local_0001 A,A_grouped,U_out,V GROUP_BY Message: Job failed! Error - NA file:///home/jsnider/test.out, Input(s): Failed to read data from "file:///home/jsnider/test-input.txt" Output(s): Failed to produce result in "file:///home/jsnider/test.out" Job DAG: job_local_0001 2013-12-18 16:48:00,015 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed! 2013-12-18 16:48:00,040 [main] ERROR org.apache.pig.tools.grunt.GruntParser - ERROR 2244: Job failed, hadoop does not return any error message Details at logfile: /home/jsnider/pig_1387403270431.log </code></pre> <p>And the three java files:</p> <p>Test.java</p> <pre><code>package test; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import org.apache.pig.Accumulator; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; public class Test extends EvalFunc&lt;DataBag&gt; implements Accumulator&lt;DataBag&gt; { public static ArrayList&lt;Point&gt; points = null; public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; accumulate(input); DataBag output = getValue(); cleanup(); return output; } public void accumulate(DataBag b) throws IOException { try { if (b == null) return; Iterator&lt;Tuple&gt; fit = b.iterator(); while (fit.hasNext()) { Tuple f = fit.next(); storePt(f); } } catch (Exception e) { int errCode = 2106; String msg = "Error while computing in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } public void accumulate(Tuple b) throws IOException { try { if (b == null || b.size() == 0) return; for (Object f : b.getAll()) { if (f instanceof Tuple) { storePt((Tuple)f); } else if (f instanceof DataBag) { accumulate((DataBag)f); } else { throw new IOException("tuple input is not a tuple or a databag... x__x"); } } } catch (Exception e) { int errCode = 2106; String msg = "Error while computing in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } @Override public DataBag getValue() { if (points == null) points = new ArrayList&lt;Point&gt;(); Collections.sort(points); DataBag myBag = BagFactory.getInstance().newDefaultBag(); for (Point pt : points) { Measure sm = new Measure(pt); myBag.add(sm.asTuple()); } return myBag; } public void cleanup() { points = null; } public Schema outputSchema(Schema input) { try { Schema.FieldSchema tupleFs = new Schema.FieldSchema("output_tuple", Measure.smSchema(), DataType.TUPLE); Schema bagSchema = new Schema(tupleFs); Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_tuples", bagSchema, DataType.BAG); return new Schema(bagFs); } catch (Exception e){ return null; } } public static void storePt(Tuple f) { Object[] field = f.getAll().toArray(); Point pt = new Point( field[0] == null ? 0 : (Long)field[0], field[1] == null ? 0 : (Long)field[1], field[2] == null ? 0 : (Double)field[2], field[3] == null ? 0 : (Double)field[3], field[4] == null ? Double.MIN_VALUE : (Double)field[4] ); if (points == null) points = new ArrayList&lt;Point&gt;(); points.add(pt); } } </code></pre> <p>Point.java:</p> <pre><code>package test; public class Point implements Comparable&lt;Point&gt; { long id; long time; double lat; double lon; double alt; public Point(Point c) { this.id = c.id; this.time = c.time; this.lat = c.lat; this.lon = c.lon; this.alt = c.alt; } public Point(long l, long m, double d, double e, double f) { id = l; time = m; lat = d; lon = e; alt = f; } @Override public int compareTo(Point other) { final int BEFORE = -1; final int EQUAL = 0; final int AFTER = 1; if (this == other) return EQUAL; if (this.id &lt; other.id) return BEFORE; if (this.id &gt; other.id) return AFTER; if (this.time &lt; other.time) return BEFORE; if (this.time &gt; other.time) return AFTER; if (this.lat &gt; other.lat) return BEFORE; if (this.lat &lt; other.lat) return AFTER; if (this.lon &gt; other.lon) return BEFORE; if (this.lon &lt; other.lon) return AFTER; if (this.alt &gt; other.alt) return BEFORE; if (this.alt &lt; other.alt) return AFTER; return EQUAL; } public String toString() { return id + " " + time; } } </code></pre> <p>Measure.java:</p> <pre><code>package test; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; public class Measure { private long id; private long time; private double lat; private double lon; private double alt; public Measure(Point pt) { id = pt.id; time = pt.time; lat = pt.lat; lon = pt.lon; alt = pt.alt; } public Tuple asTuple() { Tuple myTuple = TupleFactory.getInstance().newTuple(); myTuple.append(id); myTuple.append(time); myTuple.append(lat); myTuple.append(lon); myTuple.append(alt); return myTuple; } public static Schema smSchema() { Schema tupleSchema = new Schema(); tupleSchema.add(new Schema.FieldSchema("id", DataType.LONG)); tupleSchema.add(new Schema.FieldSchema("time", DataType.LONG)); tupleSchema.add(new Schema.FieldSchema("lat", DataType.DOUBLE)); tupleSchema.add(new Schema.FieldSchema("lon", DataType.DOUBLE)); tupleSchema.add(new Schema.FieldSchema("alt", DataType.DOUBLE)); return tupleSchema; } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload