版本:hadoop2.2.0
源码在https://github.com/hortonworks/simple-yarn-app这里可以下载。之前一直试验这个simpleyarnapp一直没有成功过,作为yarn的hello world应该没有那么难运行吧。几经排查,发现还是classpath路径的问题。
首先,还是要按照http://blog.csdn.net/fansy1990/article/details/22896249配置环境。
这里说是classpath的问题,主要是指linux和windows里面设置java的classpath的方式是不同的。假如按照github上面的源码(由于我是使用windows提交任务的,所以会出现这样的问题,如果是linux提交任务则不会出现这样的问题),设置断点查看到的classpath的路径为:
{CLASSPATH=$HADOOP_CONF_DIR;$HADOOP_COMMON_HOME/share/hadoop/common/*;$HADOOP_COMMON_HOME/share/hadoop/common/lib/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*;$HADOOP_YARN_HOME/share/hadoop/yarn/*;$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*;%PWD%\*}而使用修改过的源码,其路径为:
{CLASSPATH=$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$PWD/*}分号和冒号以及$和%的差别。
client的源码如下:
package com.hortonworks.simpleyarnapp; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Client { Logger log = LoggerFactory.getLogger(Client.class); Configuration conf = new YarnConfiguration(); public void run(String[] args) throws Exception { final String command = args[0]; final int n = Integer.valueOf(args[1]); final Path jarPath = new Path(args[2]); // Create yarnClient // YarnConfiguraton extends Configuration // YarnConfiguration conf = new YarnConfiguration(); conf.set("fs.defaultFS", "hdfs://node31:9000"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.address", "node31:8032"); YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); // Create application via yarnClient YarnClientApplication app = yarnClient.createApplication(); // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); amContainer.setCommands( Collections.singletonList( "$JAVA_HOME/bin/java" + " -Xmx256M" + /* " com.hortonworks.simpleyarnapp.Work" +*/ " com.hortonworks.simpleyarnapp.ApplicationMaster" + " " + command + " " + String.valueOf(n) + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" ) ); // Setup jar for ApplicationMaster LocalResource appMasterJar = Records.newRecord(LocalResource.class); setupAppMasterJar(jarPath, appMasterJar); amContainer.setLocalResources( Collections.singletonMap("simpleapp.jar", appMasterJar)); // Setup CLASSPATH for ApplicationMaster Map<String, String> appMasterEnv = new HashMap<String, String>(); setupAppMasterEnv(appMasterEnv); amContainer.setEnvironment(appMasterEnv); // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); capability.setMemory(256); capability.setVirtualCores(1); // Finally, set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); appContext.setApplicationName("simple-yarn-app"); // application name appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); appContext.setQueue("default"); // queue // Submit application ApplicationId appId = appContext.getApplicationId(); System.out.println("Submitting application " + appId); log.info("Submitting application " + appId); try { yarnClient.submitApplication(appContext); } catch (Exception e) { e.printStackTrace(); } /* log.info("-----------------------------------"); for(ApplicationReport appli:yarnClient.getApplications()){ log.info("appli.getApplicationType():"+appli.getApplicationType()+"\n" +"appli.getHost():"+appli.getHost()+"\n" +"appli.getOriginalTrackingUrl():"+appli.getOriginalTrackingUrl()+"\n" +"appli.getTrackingUrl():"+appli.getTrackingUrl()+"\n" +"appli.getUser():"+appli.getUser()); } log.info("--------------------------------------");*/ ApplicationReport appReport = yarnClient.getApplicationReport(appId); YarnApplicationState appState = appReport.getYarnApplicationState(); while (appState != YarnApplicationState.FINISHED && appState != YarnApplicationState.KILLED && appState != YarnApplicationState.FAILED) { Thread.sleep(100); appReport = yarnClient.getApplicationReport(appId); appState = appReport.getYarnApplicationState(); } System.out.println( "Application " + appId + " finished with" + " state " + appState + " at " + appReport.getFinishTime()); } private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException { FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath); appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); appMasterJar.setSize(jarStat.getLen()); appMasterJar.setTimestamp(jarStat.getModificationTime()); appMasterJar.setType(LocalResourceType.FILE); appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC); } private static void addToEnvironment( Map<String, String> environment, String variable, String value) { String val = environment.get(variable); String separator = ":"; if (val == null) { val = value; } else { val = val +separator + value; } environment.put(StringInterner.weakIntern(variable), StringInterner.weakIntern(val)); } private void setupAppMasterEnv(Map<String, String> appMasterEnv) { for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); } /* Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");*/ addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), "$PWD" + Path.SEPARATOR + "*"); } public static void main(String[] args) throws Exception { Client c = new Client(); String[] arg= {"/root/myShell.sh","1","hdfs://node31:9000/input/"}; /*String[] arg= {"java","1","hdfs://node31:9000/input/"};*/ c.run(arg); } }
appMaster的源码如下(这个好像没有改动):
package com.hortonworks.simpleyarnapp; import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ApplicationMaster { static Logger log = LoggerFactory.getLogger(ApplicationMaster.class); public static void main(String[] args) throws Exception { final String command = args[0]; final int n = Integer.valueOf(args[1]); if(log.isDebugEnabled()){ log.debug("Entering the ApplicationMaster"); } // Initialize clients to ResourceManager and NodeManagers Configuration conf = new YarnConfiguration(); /*conf.set("fs.defaultFS", "hdfs://node31:9000"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.address", "node31:8032");*/ AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient(); rmClient.init(conf); rmClient.start(); NMClient nmClient = NMClient.createNMClient(); nmClient.init(conf); nmClient.start(); // Register with ResourceManager System.out.println("registerApplicationMaster 0"); rmClient.registerApplicationMaster("", 0, ""); System.out.println("registerApplicationMaster 1"); // Priority for worker containers - priorities are intra-application Priority priority = Records.newRecord(Priority.class); priority.setPriority(0); // Resource requirements for worker containers Resource capability = Records.newRecord(Resource.class); capability.setMemory(128); capability.setVirtualCores(1); // Make container requests to ResourceManager for (int i = 0; i < n; ++i) { ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority); System.out.println("Making res-req " + i); rmClient.addContainerRequest(containerAsk); } // Obtain allocated containers and launch int allocatedContainers = 0; while (allocatedContainers < n) { AllocateResponse response = rmClient.allocate(0); for (Container container : response.getAllocatedContainers()) { ++allocatedContainers; // Launch container by create ContainerLaunchContext ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); ctx.setCommands( Collections.singletonList( command + /* "$JAVA_HOME/bin/java" +*/ /*"/bin/bash /root/myShell.sh" +*/ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" )); System.out.println("Launching container " + allocatedContainers); nmClient.startContainer(container, ctx); } Thread.sleep(100); } // Now wait for containers to complete int completedContainers = 0; while (completedContainers < n) { AllocateResponse response = rmClient.allocate(completedContainers/n); for (ContainerStatus status : response.getCompletedContainersStatuses()) { ++completedContainers; System.out.println("Completed container " + completedContainers); } Thread.sleep(100); } // Un-register with ResourceManager rmClient.unregisterApplicationMaster( FinalApplicationStatus.SUCCEEDED, "", ""); } }
在client中的Configuration需要配置(conf.set()...),而在AppMaster中就不需要这样做了。
把上面两个文件编译打包放在$hadoop_home/share/hadoop/yarn/lib下面即可。
编写shell文件:在/root/myShell.sh中输入下面的内容:
#!/bin/bash touc "/root/a.txt" cho "oh ,it works !" > /root/a.txt
这里可以看到shell中的语法是错的,touc-->touch , cho-->echo 。
然后运行client的程序,查看/root下面是否有a.txt文件。如果有,则说明确实是执行了shell文件了。如果没有则说明有问题。(正常的情况下是有这个文件的)。同时如果把shell的错误改为正确的,还可以看到a.txt里面的文字:oh, it works !
这里可以知道运行shell其实是失败了的,但是在resourcemanager的log里面看到这个任务是成功的,并且没有提示其他错误信息。所以,这说明其实这个程序只是可以运行shell而已,至于是否运行正确或者错误就不管了?
又或者说是我的shell编写的太简单了,没有含有程序失败的控制之类的或者说是容错的程序代码段?可以看到在AppMaster中其实也是有log的
ctx.setCommands( Collections.singletonList( command + /* "$JAVA_HOME/bin/java" +*/ /*"/bin/bash /root/myShell.sh" +*/ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" ));
这个log应该记录的是command的log吧,但是这个log暂时不清楚在哪。假如说这个log是记录command的错误信息的话,那找到这个log就可以看到我们的command执行情况了。但是这个command和我们集群的resourceManger是否有通信?即resourceManger如果可以获取command执行的状态的话,应该是以这个状态来返回作为最后job运行的状态。
command是在container中运行的,resourcemanager应该可以获取container的任务执行状态。所以应该是编写的shell没有通知到container来做相应的变化么?
另外,如果我去掉AppMaster的话,而是自己写一个一般的java程序,比如就把一些数据写入hdfs,如下:
package com.hortonworks.simpleyarnapp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.io.Closeables; public class Work { /** * @param args */ public static void main(String[] args) { String info="first argument is:"+args[0]+"\n" +"second argument is :"+args[1]; System.out.println("--------------------------------"+info); Configuration conf = new YarnConfiguration(); conf.set("fs.defaultFS", "hdfs://node31:9000"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.address", "node31:8032"); writeString(info,conf); } private static void writeString(String value,Configuration conf) { Path path=new Path("hdfs://node31:9000/input/work.info"); FileSystem fs; FSDataOutputStream out=null; try { fs = FileSystem.get(path.toUri(),conf); out = fs.create(path); out.writeUTF(value); } catch(Exception e){ e.printStackTrace(); }finally { Closeables.closeQuietly(out); } } }
然后把client的命令改为:
amContainer.setCommands( Collections.singletonList( "$JAVA_HOME/bin/java" + " -Xmx256M" + " com.hortonworks.simpleyarnapp.Work" + /* " com.hortonworks.simpleyarnapp.ApplicationMaster" +*/ " " + command + " " + String.valueOf(n) + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" ) );那么,其实也是可以运行Work的,不过,Work的内容正确运行(确实在hdfs中写入了数据),但是job的状态返回的是fail的。但是确实是可以提交任务的。
还有一点,假如,我把AppMaster的command换为java命令,然后来执行我的Work,这样应该也是可以的。但是目前的情况是,任务执行成功,但是Work的内容却是没有执行(hdfs没有写入数据)。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990