Note that there are some explanatory texts on larger screens.

plurals
  1. POMiniMRYarnCluster, run MR locally
    primarykey
    data
    text
    <p>I'm trying to run MR jobs locally using MiniMRYarnCluster. <strong>I'm using old mapreduce (not YARN) and mapreduce API v2</strong> This stuff can be found here:</p> <pre><code>&lt;dependency&gt; &lt;groupId&gt;org.apache.hadoop&lt;/groupId&gt; &lt;artifactId&gt;hadoop-mapreduce-client-jobclient&lt;/artifactId&gt; &lt;version&gt;2.0.0-cdh4.1.1&lt;/version&gt; &lt;type&gt;test-jar&lt;/type&gt; &lt;scope&gt;test&lt;/scope&gt; &lt;/dependency&gt; </code></pre> <p>Here is a portion of log:</p> <pre><code>--127.0.1.1-58175-1358256748215, blockid: BP-1072059606-127.0.1.1-1358256746988:blk_6137856716359201843_1008, duration: 229871 13/01/15 17:32:34 INFO localizer.LocalizedResource: Resource hdfs://localhost:50123/apps_staging_dir/ssa/.staging/job_1358256748507_0001/job.xml transitioned from DOWNLOADING to LOCALIZED 13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZING to LOCALIZED 13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZED to RUNNING 13/01/15 17:32:34 INFO nodemanager.DefaultContainerExecutor: launchContainer: [bash, /home/ssa/devel/POIClusterMapreduceTest/ru.mrjob.poi.POIClusterMapreduceTest-localDir-nm-0_1/usercache/ssa/appcache/application_1358256748507_0001/container_1358256748507_0001_01_000001/default_container_executor.sh] 13/01/15 17:32:34 WARN nodemanager.DefaultContainerExecutor: Exit code from task is : 1 13/01/15 17:32:34 INFO nodemanager.ContainerExecutor: 13/01/15 17:32:34 WARN launcher.ContainerLaunch: Container exited with a non-zero exit code 1 </code></pre> <p>And here is an exception:</p> <pre><code>Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/service/CompositeService at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) at java.net.URLClassLoader.defineClass(URLClassLoader.java:283) at java.net.URLClassLoader.access$000(URLClassLoader.java:58) at java.net.URLClassLoader$1.run(URLClassLoader.java:197) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.service.CompositeService at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) ... 12 more Could not find the main class: org.apache.hadoop.mapreduce.v2.app.MRAppMaster. Program will exit. </code></pre> <p>I've used org.apache.hadoop.mapreduce.v2.TestMRJobs as a base for my own test. Did anyone met the problem?</p> <p>Here is my code, it an abstract base class to test MR jobs locally on CI server or developer machine:</p> <pre><code>public abstract class AbstractClusterMapReduceTest { private static final Log LOG = LogFactory.getLog(AbstractClusterMapReduceTest.class); public static final String DEFAULT_LOG_CATALOG = "local-mr-logs"; private static final int DEFAULT_NAMENODE_PORT = 50123; private static final int ONE_DATANODE = 1; private static final int DEFAULT_REDUCE_NUM_TASKS = 1; private static final String SLASH = "/"; private static final String DEFAULT_MR_INPUT_DATA_FILE = "mr-input-data-file"; private MiniMRYarnCluster mrCluster; private MiniDFSCluster dfsCluster; /** Shitty code from base Cloudera example*/ private static Path TEST_ROOT_DIR = new Path("target", AbstractClusterMapReduceTest.class.getName() + "-tmpDir").makeQualified(getLocalFileSystem()); static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); private static FileSystem getLocalFileSystem(){ try { return FileSystem.getLocal(new Configuration()); } catch (IOException e) { throw new Error("Can't access local file system. MR cluster can't be started", e); } } /** * Always provide path to log catalog. * Default is: ${project.build.directory}/{@link AbstractClusterMapReduceTest#DEFAULT_LOG_CATALOG} * */ protected String getPathToLogCatalog(){ return getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG; } private String getPathToOutputDirectory(){ return System.getProperty("project.build.directory"); } private void checkAppJar(){ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { throw new Error("MRAppJar " + MiniMRYarnCluster.APPJAR+ " not found. Not running test."); }else{ LOG.info(MiniMRYarnCluster.APPJAR + " is at the right place. Can continue to setup Env..."); } } public void setupEnv() throws IOException{ checkAppJar(); System.setProperty("hadoop.log.dir", getPathToLogCatalog()); System.setProperty("javax.xml.parsers.SAXParserFactory", "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); dfsCluster = buildMiniDFSCluster(); //dfsCluster.getFileSystem().makeQualified(createPath(getHDFSPathToInputData())); //dfsCluster.getFileSystem().makeQualified(createPath(getOutputPath())); mrCluster = new MiniMRYarnCluster(this.getClass().getName(), 1); Configuration conf = new Configuration(); conf.set("fs.defaultFS", getFileSystem().getUri().toString()); // use HDFS //conf.set(MRJobConfig.MR_AM_STAGING_DIR, getPathToOutputDirectory()+"/tmp-mapreduce"); conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); mrCluster.init(conf); mrCluster.start(); //Cloudera tricks :) // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to // workaround the absent public discache. getLocalFileSystem().copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); getLocalFileSystem().setPermission(APP_JAR, new FsPermission("700")); } public void tearDown() { if (mrCluster != null) { mrCluster.stop(); mrCluster = null; } if (dfsCluster != null) { dfsCluster.shutdown(); dfsCluster = null; } } public boolean createAndSubmitJob() throws IOException, ClassNotFoundException, InterruptedException{ LOG.info("createAndSubmitJob: enter"); checkAppJar(); LOG.info("MRAppJar has been found. Can start to create Job"); Configuration configuration = mrCluster.getConfig(); configuration.set(MRConfig.MASTER_ADDRESS, "local"); Job job = Job.getInstance(configuration); job.setJobName(this.getClass().getSimpleName()+"-job"); job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(getMRJobClass()); job.setJobName(getMRJobClass().getSimpleName()); job.setNumReduceTasks(getReduceNumTasks()); job.setOutputKeyClass(getOutputKeyClass()); job.setOutputValueClass(getOutputValueClass()); job.setMapperClass(getMapperClass()); job.setReducerClass(getReducerClass()); job.setInputFormatClass(getInputFormat()); job.setOutputFormatClass(getOutputFormat()); FileInputFormat.setInputPaths(job, getHDFSPathToInputData()); FileOutputFormat.setOutputPath(job, createPath(getOutputPath())); job.setSpeculativeExecution(false); job.setMaxMapAttempts(1); // speed up failures LOG.info("Submitting job..."); job.submit(); LOG.info("Job has been submitted."); String trackingUrl = job.getTrackingURL(); String jobId = job.getJobID().toString(); LOG.info("trackingUrl:" +trackingUrl); LOG.info("jobId:" +jobId); return job.waitForCompletion(true); } protected FileSystem getFileSystem() throws IOException { return dfsCluster.getFileSystem(); } protected int getReduceNumTasks(){ return DEFAULT_REDUCE_NUM_TASKS; } /** * @return InputStream instance to file you want to run with your MR job * */ protected InputStream getInputStreamForInputData() { return this.getClass().getClassLoader().getResourceAsStream(this.getClass().getSimpleName()+"/"+getInputDatasetName()); //return getPathToOutputDirectory()+ SLASH + DEFAULT_INPUT_CATALOG+"/mr-input-data"; } protected String getHDFSPathToInputData() throws IOException{ InputStream inputStream = getInputStreamForInputData(); Path hdfsInputPath = new Path(DEFAULT_MR_INPUT_DATA_FILE); FSDataOutputStream fsDataOutputStream = getFileSystem().create(hdfsInputPath); copyStream(inputStream, fsDataOutputStream); fsDataOutputStream.close(); inputStream.close(); return hdfsInputPath.toString(); } private void copyStream(InputStream input, OutputStream output) throws IOException { byte[] buffer = new byte[1024]; // Adjust if you want int bytesRead; while ((bytesRead = input.read(buffer)) != -1) { output.write(buffer, 0, bytesRead); } } /** * Dataset should be placed in resources/ConcreteClusterMapReduceTest * @return a name of a file from catalog. * */ protected abstract String getInputDatasetName(); /** * @return path reducer output * default is: @{link AbstractClusterMapReduceTest#DEFAULT_OUTPUT_CATALOG} * */ protected String getOutputPath(){ return "mr-data-output"; } /** * Creates @{link Path} using absolute path to some FS resource * @return new Path instance. * */ protected Path createPath(String pathToFSResource){ return new Path(pathToFSResource); } /** * Builds new instance of MiniDFSCluster * Default: @{link DEFAULT_NAMENODE_PORT}, @{link DEFAULT_NAMENODE_PORT} * @return MiniDFSCluster instance. * */ protected MiniDFSCluster buildMiniDFSCluster() throws IOException { return new MiniDFSCluster.Builder(new Configuration()) .nameNodePort(DEFAULT_NAMENODE_PORT) .numDataNodes(ONE_DATANODE) .build(); } protected abstract Class&lt;? extends Configured&gt; getMRJobClass(); protected abstract Class&lt;? extends Mapper&gt; getMapperClass(); protected abstract Class&lt;? extends Reducer&gt; getReducerClass(); protected abstract Class&lt;? extends InputFormat&gt; getInputFormat(); protected abstract Class&lt;? extends OutputFormat&gt; getOutputFormat(); protected abstract Class&lt;?&gt; getOutputKeyClass(); protected abstract Class&lt;?&gt; getOutputValueClass(); } </code></pre> <p>And concrete test subclass:</p> <pre><code>public class POIClusterMapreduceTest extends AbstractClusterMapReduceTest{ private static final String INTEGRATION = "integration"; @BeforeClass(groups = INTEGRATION) public void setup() throws IOException { super.setupEnv(); } //@Test(groups = INTEGRATION) public void runJob() throws InterruptedException, IOException, ClassNotFoundException { boolean result = createAndSubmitJob(); MatcherAssert.assertThat(result, Matchers.is(true)); String outputResultAsString = getFileSystem().open(createPath(getOutputPath())).readUTF(); MatcherAssert.assertThat(outputResultAsString.length(), Matchers.greaterThan(0)); } @AfterClass(groups = INTEGRATION) public void tearDown(){ super.tearDown(); } @Override protected Class&lt;Main&gt; getMRJobClass() { return Main.class; } @Override protected Class&lt;POIMapper&gt; getMapperClass() { return POIMapper.class; } @Override protected Class&lt;Reducer&gt; getReducerClass() { return Reducer.class; } @Override protected Class&lt;TextInputFormat&gt; getInputFormat() { return TextInputFormat.class; } @Override protected Class&lt;TextOutputFormat&gt; getOutputFormat() { return TextOutputFormat.class; } @Override protected Class&lt;LongWritable&gt; getOutputKeyClass() { return LongWritable.class; } @Override protected Class&lt;XVLRDataWritable&gt; getOutputValueClass() { return XVLRDataWritable.class; } @Override protected String getInputDatasetName() { return "mr-input-data"; } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload