Hadoop随笔(一):工作流程的源码

一、几个可能会用到的属性值

  1、mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution

  这两个属性可以决定Map任务和Reduce任务是否开启推测式执行策略。推测式执行策略在Hadoop中用来应对执行缓慢的任务所造成的瓶颈,但是对代码缺陷所导致的任务执行过慢,推测执行是一种反向的作用,应当避免,而Hadoop默认是开启推测式执行的。

  2、mapred.job.reuse.jvm.num.tasks

  这个属性值默认为1,表示一个JVM上只运行一个任务。如果Task Tracker具有很多小任务,重复启动JVM是非常耗时的,可以考虑任务JVM重用。将属性值设置为大于1的值表示启用了JVM;设置为-1表示共享此JVM的任务数目不受限制。尽管启用了JVM重用,但是Task Tracker上的多个任务还是顺序执行的,只是不需要额外启动JVM而已。

  3、Task Tracker的本地参数值

名称 类型 描述
mapred.job.id String job id
mapred.jar String job目录下job.jar的位置
job.local.dir String job指定的共享存储空间
mapred.tip.id String task id
mapred.task.id String task尝试id
mapred.task.partition int task在job中的id
map.input.file String map读取的文件名
map.input.start long map输入的数据块的起始位置的偏移
map.input.length long map输入的数据块的字节数
mapred.work.output.dir String task临时输出目录

 二、Hadoop工作流程

  以下Hadoop源码来源于hadoop-1.2.1。

  1、作业的提交

  hadoop项目的第一步自然是根据需求进行代码编写,而后是需要配置Map类、Reduce类、Input路径、Output路径以及其他的虚拟机配置等。但这些操作只是为hadoop形成了一个作业,还算不上Hadoop的工作流程,将项目提交执行后,hadoop进入完全自动的运行方式,工作流程才开始启动。

  作业的提交代码实现于JobClient类(JobClient.java)的submitJobInternal方法,分为以下几个步骤:

  a.获取Job的ID

    JobID jobId = jobSubmitClient.getNewJobId();

  b.分配job在HDFS中的资源空间,配置其路径

    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());    //jobCopy是拷贝的一个job副本
    JobStatus status = null;

  c.监控地址令牌,直到可以对该空间进行操作。

    TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),new Path [] {submitJobDir},jobCopy);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
int reduces = jobCopy.getNumReduceTasks();
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
job.setJobSubmitHostAddress(ip.getHostAddress());
job.setJobSubmitHostName(ip.getHostName());
}
JobContext context = new JobContext(jobCopy, jobId);

  d.检查output类型

    if (reduces == 0 ? jobCopy.getUseNewMapper() :
jobCopy.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(),
jobCopy);
output.checkOutputSpecs(context);
} else {
jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
}

  e.为job划分splits

    FileSystem fs = submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
int maps = writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);

  f.在job的配置文件中写入相关的排队、配置信息

    String queue = jobCopy.getQueueName();
AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
jobCopy.set(QueueManager.toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

  g.将job的信息通知给JobTrack

    FSDataOutputStream out = FileSystem.create(fs, submitJobFile,new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

  f.将job写入到HDFS中。需要注意的是,写入job之前要先将cache中的数据清除。这样做的原因:cache中的数据对当前job是没有用的了,如果在job写入之前清除是没有影响的;但是,如果在写入之后清除,cache中的很可能已经被更改为其他job的信息了,清除cache中的数据会破坏其他的job。

    TokenCache.cleanUpTokenReferral(jobCopy);
try {
jobCopy.writeXml(out);
} finally {
out.close();
}

  g.调用submitJob类来真正的提交job

    printTokens(jobId, jobCopy.getCredentials());
    status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());
    JobProfile prof = jobSubmitClient.getJobProfile(jobId);
    if (status != null && prof != null) {
return new NetworkedJob(status, prof, jobSubmitClient);
} else {
throw new IOException("Could not launch job");
}

  至此,一个作业就被完整地提交给JobTrack和HDFS了。

  2、作业的初始化

  作业的初始化主要是考虑到map和reduce都需要进行初始化,因此需要注意其中的细节。

  作业的初始化源码实现于JobInprogress类(JobInProgress.java)的initTask方法。作业初始化的步骤如下:

  a.首先需要获取job的一些配置信息和运行信息

    if (!jobtracker.getConf().getBoolean(JT_JOB_INIT_EXCEPTION_OVERRIDE, false) && getJobConf().getBoolean(JOB_INIT_EXCEPTION, false)) {
   waitForInitWaitLockForTests();
  }
    if (tasksInited || isComplete()) {
      return;
    }
    synchronized(jobInitKillStatus){    //互锁操作
      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
        return;
    }
    jobInitKillStatus.initStarted = true;
    }

  b.获取job的执行信息和优先级信息

  c.根据input splits的数目决定map的数目,每一个splits都需要一个map

    TaskSplitMetaInfo[] splits = createSplits(jobId);
    if (numMapTasks != splits.length) {
    throw new IOException("Number of maps in JobConf doesn't match number of " +
"recieved splits for job " + jobId + "! " +
"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
    }
    numMapTasks = splits.length;

  d.检查splits的位置,保证创建的map/reduce或者初始化的map/reduce都是有意义的,同时还需要在监控进程中设置map和reduce的信息

    for (TaskSplitMetaInfo split : splits) {
      NetUtils.verifyHostnames(split.getLocations());
    }
    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

  e.为splits分配map

    maps = new TaskInProgress[numMapTasks];
    for(int i=0; i < numMapTasks; ++i) {
      inputLength += splits[i].getInputDataLength();
      maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
    }
    LOG.info("Input size for job " + jobId + " = " + inputLength+ ". Number of splits = " + splits.length);

  f.将map放入到等待执行的缓冲区内

    localityWaitFactor = conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
    if (numMapTasks > 0) {
      nonRunningMapCache = createCache(splits, maxLevel);
    }
    // set the launch time
    this.launchTime = jobtracker.getClock().getTime();

  g.同样的对待reduce操作

  h.创建两个清除进程,一个清除map,一个清除reduce

    cleanup = new TaskInProgress[2];
    // cleanup map tip. This map doesn't use any splits. Just assign an empty
    // split.
    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1);
    cleanup[0].setJobCleanupTask();
    // cleanup reduce tip.
    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
    cleanup[1].setJobCleanupTask();

  i.创建两个设置进程,分别对map和reduce进行初始化设置

    setup = new TaskInProgress[2];
    // setup map tip. This map doesn't use any split. Just assign an empty
    // split.
    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1);
    setup[0].setJobSetupTask();
     // setup reduce tip.
    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
    setup[1].setJobSetupTask();

  j.最后,设置一个互锁的方法,用来检查初始化是否成功,并写入日志

    synchronized(jobInitKillStatus){
      jobInitKillStatus.initDone = true;
       // set this before the throw to make sure cleanup works properly
      tasksInited = true;
      if(jobInitKillStatus.killed) {
        throw new KillInterruptedException("Job " + jobId + " killed in init");
      }
    }
    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, numMapTasks, numReduceTasks);

  至此,作业的初始化操作就已经完成了。由于初始化操作需要根据splits数目来确定需要初始化的map和reduce操作,因此相比较有点复杂。

  3、作业的分配

  作业的分配需要JobTrack和TaskTrack相互协调,JobTrack根据TaskTrack发送的心跳信息来决定任务的分配方法。心跳信息会报告当前任务的状况,会提出新的任务请求,或者提出任务执行失败的报告等。JobTrack接受到信息后会采取相应的措施。

  Hadoop的作业分配是一种“拉”的方式,TaskTrack发送心跳信息(源码实现为TaskTrack类(TaskTrack.java)的transmitHeartBeat方法)给JobTrack来请求一个新的任务,而这个TaskTrack提出任务请求时也会提供自己当前map/reduce任务槽的数量供JobTrack参考。JobTrack根据心跳信息(源码实现为JobTrack类(JobTrack.java)的heartbeat方法)在本地优先的情况下对该TaskTrack分配Task。

  4、作业的执行

  作业的执行同样在TaskTrack中完成, 在作业执行时,第一步是将任务进行本地化,第二步是通过虚拟机执行任务。

  任务本地化,基于TaskTrack.java的localizeJob方法实现,主要步骤为:

  a.将job.split复制到本地;b.将job.jar复制到本地;c.将job的配置信息写入job.xml;d.创建本地目录,解压缩job.jar;e.调用launchTaskForJob方法发布任务;f.调用launchTask方法启动任务。

  任务的执行包括map执行(MapTaskRunner类)和reduce执行(ReduceTaskRunner类),每个任务的执行都是通过一个JVM实现的。

  5、作业的进度

  作业进度的更新依赖于TaskTrack向JobTrack发送的心跳信息。每个TaskTrack都会将自己的进度信息和状态信息封装在心跳信息中,每隔5秒向JobTrack发送一次。JobTrack在收集所有的信息后统一并得出全局信息。

  

上一篇:Spring Hibernate Transaction示例


下一篇:web开发工具软件使用问题记录