- JobClient(准备运行环境)
- JobTracker(接收作业)
- TaskTracker(初始化作业)
新旧Hadoop MapReduce框架对比
1、客户端不变,其调用API及接口大部分保持兼容,这也是为了开发使用者透明化,对原码不必做大的改变,但是原框架的JobTracker和TaskTracker不见了,取而代之的是ResourceManager AppliactionMaster NodeManager三个部分。
2、ResourceManager是一个中心的服务,它做的事情是调度、启动每一个Job所属的ApplicationMaster、另外监控ApplicationMaster的存在情况。Job里面所在的task的监控,重启等内容不见了,这就是ApplicationMaster存在的原因。ResourceManager负责作业与资源的调度,接收JobSubmitter提交的作业,按照作业的上下文(context)信息,以及从NodeManager收集来的状态信息,启动调度过程,分配一个Container作为Application Master
- 用户提交作业
- JobClient按照作业配置信息(Jobconf),将作业运行需要的文件上传 到JobTracker文件系统(通常是HDFS,HDFS中的文件是所有节点共享的)中的某个目录下
- JobClient调用RPC接口向JobTracker提交作业
- JobTracker接收到作业后,将其告知TaskScheduler,由TaskScheduler对作业进行初始化
* Unpack a jar file into a directory.
* This version unpacks all files inside the jar regardless of filename.
* @param jarFile the .jar file to unpack
* @param toDir the destination directory into which to unpack the jar
* @throws IOException if an I/O error has occurred or toDir
* cannot be created and does not already exist
public static void unJar(File jarFile, File toDir) throws IOException {
unJar(jarFile, toDir, MATCH_ANY);
* Creates a classloader based on the environment that was specified by the
* user. If HADOOP_USE_CLIENT_CLASSLOADER is specified, it creates an
* application classloader that provides the isolation of the user class space
* from the hadoop classes and their dependencies. It forms a class space for
* the user jar as well as the HADOOP_CLASSPATH. Otherwise, it creates a
* classloader that simply adds the user jar to the classpath.
private ClassLoader createClassLoader(File file, final File workDir)
throws MalformedURLException {
ClassLoader loader;
// see if the client classloader is enabled
if (useClientClassLoader()) {
StringBuilder sb = new StringBuilder();
// HADOOP_CLASSPATH is added to the client classpath
String hadoopClasspath = getHadoopClasspath();
if (hadoopClasspath != null && !hadoopClasspath.isEmpty()) {
String clientClasspath = sb.toString();
// get the system classes
String systemClasses = getSystemClasses();
List<String> systemClassesList = systemClasses == null ?
null :
// create an application classloader that isolates the user classes
loader = new ApplicationClassLoader(clientClasspath,
getClass().getClassLoader(), systemClassesList);
} else {
List<URL> classPath = new ArrayList<>();
classPath.add(new File(workDir + "/").toURI().toURL());
classPath.add(new File(workDir, "classes/").toURI().toURL());
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (File lib : libs) {
// create a normal parent-delegating classloader
loader = new URLClassLoader(classPath.toArray(new URL[classPath.size()]));
return loader;
用户的MapReduce程序配置了作业运行时的各种信息(Mapper类、Reduce类、Reduce Task个数等),最终在main函数中调用JobClient.runJob函数提交作业,将作业提交到JobTracker端。
* Utility that submits a job, then polls for progress until the job is
* complete.
* 提交作业,然后轮询进度直到作业完成
* @param job the job configuration.
* @throws IOException if the job fails
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
try {
if (!jc.monitorAndPrintJob(job, rj)) {
throw new IOException("Job failed!");
} catch (InterruptedException ie) {
return rj;
* Submit a job to the MR system.
* This returns a handle to the {@link RunningJob} which can be used to track
* the running-job.
* 向 MR 系统提交作业。这将返回 {@link RunningJob} 的句柄,可用于跟踪正在运行的作业。
* @param conf the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
* @throws FileNotFoundException
* @throws IOException
public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
IOException {
return submitJobInternal(conf);
public RunningJob submitJobInternal(final JobConf conf)
throws FileNotFoundException, IOException {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
Job job = Job.getInstance(conf);
return job;
Cluster prev = cluster;
// update our Cluster instance with the one created by Job for submission
// (we can't pass our Cluster instance to Job, since Job wraps the config
// instance, and the two configs would then diverge)
cluster = job.getCluster();
// It is important to close the previous cluster instance
// to cleanup resources.
if (prev != null) {
return new NetworkedJob(job);
} catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
- InputSplit元数据信息:被JobTracker使用,用来生成Task本地性相关的数据结构
- 原始InputSplit信息,被Map Task初始化时使用,用来获取自己要处理的数据
SplitMetaInfo :保存在文件job.splitmetainfo中,JobTracker端初始化时,需要读取这个文件创建MapTask
* This represents the meta information about the task split.
* The main fields are
* - start offset in actual split
* - data length that will be processed in this split
* - hosts on which this split is local
* 这表示有关任务拆分的元信息。主要字段是 - 实际拆分中的起始偏移量 - 将在此拆分中处理的数据长度 - 此拆分所在的主机
public static class SplitMetaInfo implements Writable {
private long startOffset;
private long inputDataLength;
private String[] locations;
public SplitMetaInfo() {}
public SplitMetaInfo(String[] locations, long startOffset,
long inputDataLength) {
this.locations = locations;
this.startOffset = startOffset;
this.inputDataLength = inputDataLength;
public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
try {
this.locations = split.getLocations();
this.inputDataLength = split.getLength();
this.startOffset = startOffset;
} catch (InterruptedException ie) {
throw new IOException(ie);
TaskSplitMetaInfo: 保存InputSplit元信息的数据结构。这些信息是作业在初始化时,JobTracker从文件job.splitmetainfo中获取的,host列表是任务调度器判断任务是否具有本地性的重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker收到该消息后,可以从job.split文件中读取InputSplit信息,进而运行一个新任务。
* This represents the meta information about the task split that the
* JobTracker creates
public static class TaskSplitMetaInfo {
private TaskSplitIndex splitIndex; //Split元信息在job.split文件中的位置
private long inputDataLength; //InputSplit的数据长度
private String[] locations;//InputSplit所在的host列表
public TaskSplitMetaInfo(){
this.splitIndex = new TaskSplitIndex();
this.locations = new String[0];
public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations,
long inputDataLength) {
this.splitIndex = splitIndex;
this.locations = locations;
this.inputDataLength = inputDataLength;
public TaskSplitMetaInfo(InputSplit split, long startOffset)
throws InterruptedException, IOException {
this(new TaskSplitIndex("", startOffset), split.getLocations(),
public TaskSplitMetaInfo(String[] locations, long startOffset,
long inputDataLength) {
this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
public TaskSplitIndex getSplitIndex() {
return splitIndex;
public String getSplitLocation() {
return splitIndex.getSplitLocation();
public long getInputDataLength() {
return inputDataLength;
public String[] getLocations() {
return locations;
public long getStartOffset() {
return splitIndex.getStartOffset();
* This represents the meta information about the task split that the
* task gets
public static class TaskSplitIndex {
private String splitLocation; //job.split文件的位置
private long startOffset;//InputSplit信息在job.split文件中的位置
public TaskSplitIndex(){
this("", 0);
public TaskSplitIndex(String splitLocation, long startOffset) {
this.splitLocation = splitLocation;
this.startOffset = startOffset;
public long getStartOffset() {
return startOffset;
public String getSplitLocation() {
return splitLocation;
public void readFields(DataInput in) throws IOException {
splitLocation = Text.readString(in);
startOffset = WritableUtils.readVLong(in);
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitLocation);
WritableUtils.writeVLong(out, startOffset);
- 为作业创建JobInProgress对象:该对象维护了作业的运行时信息,主要用来跟踪正在运行作业的运行状态和进度。
- 检查用户是否具有指定队列的作业提交权限:Hadoop以队列为单位管理作业和资源,每个队列分配有一定量的资源,每个用户属于一个活着多个队列且智能使用所属队列中的资源。
- 检查作业配置的内存使用量是否合理
- 通知TaskScheduler初始化作业:JobTracker收到作业后,并不会马上初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。
初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task 、Map Task 、Reduce Task 、 Cleanup Task
Setup Task : 作业初始化标识性任务。
做简单的初始化任务,该任务完成后,开始运行Map Task,该类型任务被分为Map Setup Task和Reduce Setup Task,每个作业各有一个,运行时分别占用一个Map Slot和Reduce Slot,这两任务功能相同,同一时间只有一个运行。
Map Task:Map阶段处理数据的任务,其数目及对应的处理数据分片由应用程序种的InputFormat确定。
Reduce Task:Reduce阶段处理数据的任务,其数目由用户通过参数mapred.reduce.tasks指定,Hadoop一开始只会调度Map Task,直到Map Task完成数目到达一定比例(默认5%)后,才开始调度Reduce Task
Cleanup Task:作业结束标识性任务,完成作业清理的工作,比如删除作业运行过程种用到的一些临时目录,该任务运行完成后,作业由RUNNING状态变为SUCCEEDED状态。