MapReduce核心设计(三)—— 作业提交与初始化过程分析

MapReduce核心设计(三)—— 作业提交与初始化过程分析

三个组件

  • JobClient(准备运行环境)
  • JobTracker(接收作业)
  • TaskTracker(初始化作业)
    注意,本书写的是1.x版本,hadoop2.x版本以及使用yarn来管理了,没有JobTracker和TaskTracker了

新旧Hadoop MapReduce框架对比
1、客户端不变,其调用API及接口大部分保持兼容,这也是为了开发使用者透明化,对原码不必做大的改变,但是原框架的JobTracker和TaskTracker不见了,取而代之的是ResourceManager AppliactionMaster NodeManager三个部分。

2、ResourceManager是一个中心的服务,它做的事情是调度、启动每一个Job所属的ApplicationMaster、另外监控ApplicationMaster的存在情况。Job里面所在的task的监控,重启等内容不见了,这就是ApplicationMaster存在的原因。ResourceManager负责作业与资源的调度,接收JobSubmitter提交的作业,按照作业的上下文(context)信息,以及从NodeManager收集来的状态信息,启动调度过程,分配一个Container作为Application Master

3、NodeManager功能比较专一,就是负责Container状态的维护,并向RM保持心跳。

4、ApplicationMaster负责一个Job生命周期内的所有工作,类似老的框架中JobTracker,但注意每一个Job(不是每一种)都有一个ApplicationMaster,他可以运行在ResourceManager以外的机器上.

四个步骤

  • 用户提交作业
  • JobClient按照作业配置信息(Jobconf),将作业运行需要的文件上传 到JobTracker文件系统(通常是HDFS,HDFS中的文件是所有节点共享的)中的某个目录下
  • JobClient调用RPC接口向JobTracker提交作业
  • JobTracker接收到作业后,将其告知TaskScheduler,由TaskScheduler对作业进行初始化

具体细节

作业提交过程

执行shell命令
用户写程序后,打包为jar包,然后执行jar命令提交作业,交给RunJar类处理,RunJar类中的main函数经解压jar包和设置环境变量后将运行参数传递给MapReduce程序,并运行。

/**
   * Unpack a jar file into a directory.
   *
   * This version unpacks all files inside the jar regardless of filename.
   *
   * @param jarFile the .jar file to unpack
   * @param toDir the destination directory into which to unpack the jar
   *
   * @throws IOException if an I/O error has occurred or toDir
   * cannot be created and does not already exist
   */
  public static void unJar(File jarFile, File toDir) throws IOException {
    unJar(jarFile, toDir, MATCH_ANY);
  }
/**
   * Creates a classloader based on the environment that was specified by the
   * user. If HADOOP_USE_CLIENT_CLASSLOADER is specified, it creates an
   * application classloader that provides the isolation of the user class space
   * from the hadoop classes and their dependencies. It forms a class space for
   * the user jar as well as the HADOOP_CLASSPATH. Otherwise, it creates a
   * classloader that simply adds the user jar to the classpath.
   */
  private ClassLoader createClassLoader(File file, final File workDir)
      throws MalformedURLException {
    ClassLoader loader;
    // see if the client classloader is enabled
    if (useClientClassLoader()) {
      StringBuilder sb = new StringBuilder();
      sb.append(workDir).append("/").
          append(File.pathSeparator).append(file).
          append(File.pathSeparator).append(workDir).append("/classes/").
          append(File.pathSeparator).append(workDir).append("/lib/*");
      // HADOOP_CLASSPATH is added to the client classpath
      String hadoopClasspath = getHadoopClasspath();
      if (hadoopClasspath != null && !hadoopClasspath.isEmpty()) {
        sb.append(File.pathSeparator).append(hadoopClasspath);
      }
      String clientClasspath = sb.toString();
      // get the system classes
      String systemClasses = getSystemClasses();
      List<String> systemClassesList = systemClasses == null ?
          null :
          Arrays.asList(StringUtils.getTrimmedStrings(systemClasses));
      // create an application classloader that isolates the user classes
      loader = new ApplicationClassLoader(clientClasspath,
          getClass().getClassLoader(), systemClassesList);
    } else {
      List<URL> classPath = new ArrayList<>();
      classPath.add(new File(workDir + "/").toURI().toURL());
      classPath.add(file.toURI().toURL());
      classPath.add(new File(workDir, "classes/").toURI().toURL());
      File[] libs = new File(workDir, "lib").listFiles();
      if (libs != null) {
        for (File lib : libs) {
          classPath.add(lib.toURI().toURL());
        }
      }
      // create a normal parent-delegating classloader
      loader = new URLClassLoader(classPath.toArray(new URL[classPath.size()]));
    }
    return loader;
  }

用户的MapReduce程序配置了作业运行时的各种信息(Mapper类、Reduce类、Reduce Task个数等),最终在main函数中调用JobClient.runJob函数提交作业,将作业提交到JobTracker端。

 /** 
   * Utility that submits a job, then polls for progress until the job is
   * complete.
   * 提交作业,然后轮询进度直到作业完成
   * @param job the job configuration.
   * @throws IOException if the job fails
   */
  public static RunningJob runJob(JobConf job) throws IOException {
    JobClient jc = new JobClient(job);
    RunningJob rj = jc.submitJob(job);
    try {
      if (!jc.monitorAndPrintJob(job, rj)) {
        throw new IOException("Job failed!");
      }
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
    }
    return rj;
  }
 /**
   * Submit a job to the MR system.
   * This returns a handle to the {@link RunningJob} which can be used to track
   * the running-job.
   * 向 MR 系统提交作业。这将返回 {@link RunningJob} 的句柄,可用于跟踪正在运行的作业。
   * @param conf the job configuration.
   * @return a handle to the {@link RunningJob} which can be used to track the
   *         running-job.
   * @throws FileNotFoundException
   * @throws IOException
   */
  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
                                                  IOException {
    return submitJobInternal(conf);
  }

public RunningJob submitJobInternal(final JobConf conf)
      throws FileNotFoundException, IOException {
    try {
      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
        @Override
        public Job run() throws IOException, ClassNotFoundException, 
          InterruptedException {
          Job job = Job.getInstance(conf);
          job.submit();
          return job;
        }
      });

      Cluster prev = cluster;
      // update our Cluster instance with the one created by Job for submission
      // (we can't pass our Cluster instance to Job, since Job wraps the config
      // instance, and the two configs would then diverge)
      cluster = job.getCluster();

      // It is important to close the previous cluster instance
      // to cleanup resources.
      if (prev != null) {
        prev.close();
      }
      return new NetworkedJob(job);
    } catch (InterruptedException ie) {
      throw new IOException("interrupted", ie);
    }
  }

作业文件上传

JobClent将作业提交到JobTracker端之前,要进行初始化工作,包括:获取作业ID,创建HDFS目录,上传作业文件以及生成Split文件等,这些工作由submitJobInternal(job)实现。
MapReduce作业文件的上传和下载都由DistributedCache工具完成,整个工作流程对用户透明。文件在JobClient端被提交到HDFS中。

产生InputSplit文件
提交MapReduce作业后,JobClient会调用InputFormat的getSplits方法生成InputSplit相关信息,该信息包括两部分:

  • InputSplit元数据信息:被JobTracker使用,用来生成Task本地性相关的数据结构
  • 原始InputSplit信息,被Map Task初始化时使用,用来获取自己要处理的数据

InputSplit操作主要包括三个类:JobSplit、JobSplitWriter、SplitMetaInfoReader

MapReduce核心设计(三)—— 作业提交与初始化过程分析
JobSplit封装了读写InputSplit相关的基础类,上图三个

SplitMetaInfo :保存在文件job.splitmetainfo中,JobTracker端初始化时,需要读取这个文件创建MapTask

/**
   * This represents the meta information about the task split.
   * The main fields are 
   *     - start offset in actual split
   *     - data length that will be processed in this split
   *     - hosts on which this split is local
   * 这表示有关任务拆分的元信息。主要字段是 - 实际拆分中的起始偏移量 - 将在此拆分中处理的数据长度 - 此拆分所在的主机
   */
  public static class SplitMetaInfo implements Writable {
    private long startOffset;
    private long inputDataLength;
    private String[] locations;

    public SplitMetaInfo() {}
    
    public SplitMetaInfo(String[] locations, long startOffset, 
        long inputDataLength) {
      this.locations = locations;
      this.startOffset = startOffset;
      this.inputDataLength = inputDataLength;
    }
    
    public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
      try {
        this.locations = split.getLocations();
        this.inputDataLength = split.getLength();
        this.startOffset = startOffset;
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }

TaskSplitMetaInfo: 保存InputSplit元信息的数据结构。这些信息是作业在初始化时,JobTracker从文件job.splitmetainfo中获取的,host列表是任务调度器判断任务是否具有本地性的重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker收到该消息后,可以从job.split文件中读取InputSplit信息,进而运行一个新任务。

/**
   * This represents the meta information about the task split that the 
   * JobTracker creates
   */
  public static class TaskSplitMetaInfo {
    private TaskSplitIndex splitIndex; //Split元信息在job.split文件中的位置
    private long inputDataLength; //InputSplit的数据长度
    private String[] locations;//InputSplit所在的host列表
    public TaskSplitMetaInfo(){
      this.splitIndex = new TaskSplitIndex();
      this.locations = new String[0];
    }
    public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations, 
        long inputDataLength) {
      this.splitIndex = splitIndex;
      this.locations = locations;
      this.inputDataLength = inputDataLength;
    }
    public TaskSplitMetaInfo(InputSplit split, long startOffset) 
    throws InterruptedException, IOException {
      this(new TaskSplitIndex("", startOffset), split.getLocations(), 
          split.getLength());
    }
    
    public TaskSplitMetaInfo(String[] locations, long startOffset, 
        long inputDataLength) {
      this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
    }
    
    public TaskSplitIndex getSplitIndex() {
      return splitIndex;
    }
    
    public String getSplitLocation() {
      return splitIndex.getSplitLocation();
    }
    public long getInputDataLength() {
      return inputDataLength;
    }
    public String[] getLocations() {
      return locations;
    }
    public long getStartOffset() {
      return splitIndex.getStartOffset();
    }
  }

TaskSplitIndex:JobTracker向TaskTracker分配新任务时,TaskSplitIndex用于指定新任务待处理数据位置信息在job.split文件中的索引。

/**
   * This represents the meta information about the task split that the 
   * task gets
   */
  public static class TaskSplitIndex {
    private String splitLocation; //job.split文件的位置
    private long startOffset;//InputSplit信息在job.split文件中的位置
    public TaskSplitIndex(){
      this("", 0);
    }
    public TaskSplitIndex(String splitLocation, long startOffset) {
      this.splitLocation = splitLocation;
      this.startOffset = startOffset;
    }
    public long getStartOffset() {
      return startOffset;
    }
    public String getSplitLocation() {
      return splitLocation;
    }
    public void readFields(DataInput in) throws IOException {
      splitLocation = Text.readString(in);
      startOffset = WritableUtils.readVLong(in);
    }
    public void write(DataOutput out) throws IOException {
      Text.writeString(out, splitLocation);
      WritableUtils.writeVLong(out, startOffset);
    }
  }

作业提交到JobTracker
JobClient调用RPC方法submitJob将作业提交到JobTracker端,在JobTraccker.submitJob中,依次进行下面操作(因为我看的是hadoop2.x的源码,这部分就不放源码了):

  1. 为作业创建JobInProgress对象:该对象维护了作业的运行时信息,主要用来跟踪正在运行作业的运行状态和进度。
  2. 检查用户是否具有指定队列的作业提交权限:Hadoop以队列为单位管理作业和资源,每个队列分配有一定量的资源,每个用户属于一个活着多个队列且智能使用所属队列中的资源。
  3. 检查作业配置的内存使用量是否合理
  4. 通知TaskScheduler初始化作业:JobTracker收到作业后,并不会马上初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。

JobTracker采用了观察者设计模式将“提交新作业”这一事件告诉TsakTracker

作业初始化过程

MapReduce核心设计(三)—— 作业提交与初始化过程分析
初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task 、Map Task 、Reduce Task 、 Cleanup Task
它们的运行时信息都由TaskInProgress类维护,因此,创建这些任务实际上就是创建TaskInProgress对象。

Setup Task : 作业初始化标识性任务。

做简单的初始化任务,该任务完成后,开始运行Map Task,该类型任务被分为Map Setup Task和Reduce Setup Task,每个作业各有一个,运行时分别占用一个Map Slot和Reduce Slot,这两任务功能相同,同一时间只有一个运行。

Map Task:Map阶段处理数据的任务,其数目及对应的处理数据分片由应用程序种的InputFormat确定。

Reduce Task:Reduce阶段处理数据的任务,其数目由用户通过参数mapred.reduce.tasks指定,Hadoop一开始只会调度Map Task,直到Map Task完成数目到达一定比例(默认5%)后,才开始调度Reduce Task

Cleanup Task:作业结束标识性任务,完成作业清理的工作,比如删除作业运行过程种用到的一些临时目录,该任务运行完成后,作业由RUNNING状态变为SUCCEEDED状态。

上一篇:用python实现TPS和平均响应时间统计数据


下一篇:Angular建立待办事项应用