三个组件
- JobClient(准备运行环境)
- JobTracker(接收作业)
- TaskTracker(初始化作业)
注意,本书写的是1.x版本,hadoop2.x版本以及使用yarn来管理了,没有JobTracker和TaskTracker了
新旧Hadoop MapReduce框架对比
1、客户端不变,其调用API及接口大部分保持兼容,这也是为了开发使用者透明化,对原码不必做大的改变,但是原框架的JobTracker和TaskTracker不见了,取而代之的是ResourceManager AppliactionMaster NodeManager三个部分。
2、ResourceManager是一个中心的服务,它做的事情是调度、启动每一个Job所属的ApplicationMaster、另外监控ApplicationMaster的存在情况。Job里面所在的task的监控,重启等内容不见了,这就是ApplicationMaster存在的原因。ResourceManager负责作业与资源的调度,接收JobSubmitter提交的作业,按照作业的上下文(context)信息,以及从NodeManager收集来的状态信息,启动调度过程,分配一个Container作为Application Master
3、NodeManager功能比较专一,就是负责Container状态的维护,并向RM保持心跳。
4、ApplicationMaster负责一个Job生命周期内的所有工作,类似老的框架中JobTracker,但注意每一个Job(不是每一种)都有一个ApplicationMaster,他可以运行在ResourceManager以外的机器上.
四个步骤
- 用户提交作业
- JobClient按照作业配置信息(Jobconf),将作业运行需要的文件上传 到JobTracker文件系统(通常是HDFS,HDFS中的文件是所有节点共享的)中的某个目录下
- JobClient调用RPC接口向JobTracker提交作业
- JobTracker接收到作业后,将其告知TaskScheduler,由TaskScheduler对作业进行初始化
具体细节
作业提交过程
执行shell命令
用户写程序后,打包为jar包,然后执行jar命令提交作业,交给RunJar类处理,RunJar类中的main函数经解压jar包和设置环境变量后将运行参数传递给MapReduce程序,并运行。
/**
* Unpack a jar file into a directory.
*
* This version unpacks all files inside the jar regardless of filename.
*
* @param jarFile the .jar file to unpack
* @param toDir the destination directory into which to unpack the jar
*
* @throws IOException if an I/O error has occurred or toDir
* cannot be created and does not already exist
*/
public static void unJar(File jarFile, File toDir) throws IOException {
unJar(jarFile, toDir, MATCH_ANY);
}
/**
* Creates a classloader based on the environment that was specified by the
* user. If HADOOP_USE_CLIENT_CLASSLOADER is specified, it creates an
* application classloader that provides the isolation of the user class space
* from the hadoop classes and their dependencies. It forms a class space for
* the user jar as well as the HADOOP_CLASSPATH. Otherwise, it creates a
* classloader that simply adds the user jar to the classpath.
*/
private ClassLoader createClassLoader(File file, final File workDir)
throws MalformedURLException {
ClassLoader loader;
// see if the client classloader is enabled
if (useClientClassLoader()) {
StringBuilder sb = new StringBuilder();
sb.append(workDir).append("/").
append(File.pathSeparator).append(file).
append(File.pathSeparator).append(workDir).append("/classes/").
append(File.pathSeparator).append(workDir).append("/lib/*");
// HADOOP_CLASSPATH is added to the client classpath
String hadoopClasspath = getHadoopClasspath();
if (hadoopClasspath != null && !hadoopClasspath.isEmpty()) {
sb.append(File.pathSeparator).append(hadoopClasspath);
}
String clientClasspath = sb.toString();
// get the system classes
String systemClasses = getSystemClasses();
List<String> systemClassesList = systemClasses == null ?
null :
Arrays.asList(StringUtils.getTrimmedStrings(systemClasses));
// create an application classloader that isolates the user classes
loader = new ApplicationClassLoader(clientClasspath,
getClass().getClassLoader(), systemClassesList);
} else {
List<URL> classPath = new ArrayList<>();
classPath.add(new File(workDir + "/").toURI().toURL());
classPath.add(file.toURI().toURL());
classPath.add(new File(workDir, "classes/").toURI().toURL());
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (File lib : libs) {
classPath.add(lib.toURI().toURL());
}
}
// create a normal parent-delegating classloader
loader = new URLClassLoader(classPath.toArray(new URL[classPath.size()]));
}
return loader;
}
用户的MapReduce程序配置了作业运行时的各种信息(Mapper类、Reduce类、Reduce Task个数等),最终在main函数中调用JobClient.runJob函数提交作业,将作业提交到JobTracker端。
/**
* Utility that submits a job, then polls for progress until the job is
* complete.
* 提交作业,然后轮询进度直到作业完成
* @param job the job configuration.
* @throws IOException if the job fails
*/
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
try {
if (!jc.monitorAndPrintJob(job, rj)) {
throw new IOException("Job failed!");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return rj;
}
/**
* Submit a job to the MR system.
* This returns a handle to the {@link RunningJob} which can be used to track
* the running-job.
* 向 MR 系统提交作业。这将返回 {@link RunningJob} 的句柄,可用于跟踪正在运行的作业。
* @param conf the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
* @throws FileNotFoundException
* @throws IOException
*/
public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
IOException {
return submitJobInternal(conf);
}
public RunningJob submitJobInternal(final JobConf conf)
throws FileNotFoundException, IOException {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
Job job = Job.getInstance(conf);
job.submit();
return job;
}
});
Cluster prev = cluster;
// update our Cluster instance with the one created by Job for submission
// (we can't pass our Cluster instance to Job, since Job wraps the config
// instance, and the two configs would then diverge)
cluster = job.getCluster();
// It is important to close the previous cluster instance
// to cleanup resources.
if (prev != null) {
prev.close();
}
return new NetworkedJob(job);
} catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
}
}
作业文件上传
JobClent将作业提交到JobTracker端之前,要进行初始化工作,包括:获取作业ID,创建HDFS目录,上传作业文件以及生成Split文件等,这些工作由submitJobInternal(job)实现。
MapReduce作业文件的上传和下载都由DistributedCache工具完成,整个工作流程对用户透明。文件在JobClient端被提交到HDFS中。
产生InputSplit文件
提交MapReduce作业后,JobClient会调用InputFormat的getSplits方法生成InputSplit相关信息,该信息包括两部分:
- InputSplit元数据信息:被JobTracker使用,用来生成Task本地性相关的数据结构
- 原始InputSplit信息,被Map Task初始化时使用,用来获取自己要处理的数据
InputSplit操作主要包括三个类:JobSplit、JobSplitWriter、SplitMetaInfoReader
JobSplit封装了读写InputSplit相关的基础类,上图三个
SplitMetaInfo :保存在文件job.splitmetainfo中,JobTracker端初始化时,需要读取这个文件创建MapTask
/**
* This represents the meta information about the task split.
* The main fields are
* - start offset in actual split
* - data length that will be processed in this split
* - hosts on which this split is local
* 这表示有关任务拆分的元信息。主要字段是 - 实际拆分中的起始偏移量 - 将在此拆分中处理的数据长度 - 此拆分所在的主机
*/
public static class SplitMetaInfo implements Writable {
private long startOffset;
private long inputDataLength;
private String[] locations;
public SplitMetaInfo() {}
public SplitMetaInfo(String[] locations, long startOffset,
long inputDataLength) {
this.locations = locations;
this.startOffset = startOffset;
this.inputDataLength = inputDataLength;
}
public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
try {
this.locations = split.getLocations();
this.inputDataLength = split.getLength();
this.startOffset = startOffset;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
TaskSplitMetaInfo: 保存InputSplit元信息的数据结构。这些信息是作业在初始化时,JobTracker从文件job.splitmetainfo中获取的,host列表是任务调度器判断任务是否具有本地性的重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker收到该消息后,可以从job.split文件中读取InputSplit信息,进而运行一个新任务。
/**
* This represents the meta information about the task split that the
* JobTracker creates
*/
public static class TaskSplitMetaInfo {
private TaskSplitIndex splitIndex; //Split元信息在job.split文件中的位置
private long inputDataLength; //InputSplit的数据长度
private String[] locations;//InputSplit所在的host列表
public TaskSplitMetaInfo(){
this.splitIndex = new TaskSplitIndex();
this.locations = new String[0];
}
public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations,
long inputDataLength) {
this.splitIndex = splitIndex;
this.locations = locations;
this.inputDataLength = inputDataLength;
}
public TaskSplitMetaInfo(InputSplit split, long startOffset)
throws InterruptedException, IOException {
this(new TaskSplitIndex("", startOffset), split.getLocations(),
split.getLength());
}
public TaskSplitMetaInfo(String[] locations, long startOffset,
long inputDataLength) {
this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
}
public TaskSplitIndex getSplitIndex() {
return splitIndex;
}
public String getSplitLocation() {
return splitIndex.getSplitLocation();
}
public long getInputDataLength() {
return inputDataLength;
}
public String[] getLocations() {
return locations;
}
public long getStartOffset() {
return splitIndex.getStartOffset();
}
}
TaskSplitIndex:JobTracker向TaskTracker分配新任务时,TaskSplitIndex用于指定新任务待处理数据位置信息在job.split文件中的索引。
/**
* This represents the meta information about the task split that the
* task gets
*/
public static class TaskSplitIndex {
private String splitLocation; //job.split文件的位置
private long startOffset;//InputSplit信息在job.split文件中的位置
public TaskSplitIndex(){
this("", 0);
}
public TaskSplitIndex(String splitLocation, long startOffset) {
this.splitLocation = splitLocation;
this.startOffset = startOffset;
}
public long getStartOffset() {
return startOffset;
}
public String getSplitLocation() {
return splitLocation;
}
public void readFields(DataInput in) throws IOException {
splitLocation = Text.readString(in);
startOffset = WritableUtils.readVLong(in);
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitLocation);
WritableUtils.writeVLong(out, startOffset);
}
}
作业提交到JobTracker
JobClient调用RPC方法submitJob将作业提交到JobTracker端,在JobTraccker.submitJob中,依次进行下面操作(因为我看的是hadoop2.x的源码,这部分就不放源码了):
- 为作业创建JobInProgress对象:该对象维护了作业的运行时信息,主要用来跟踪正在运行作业的运行状态和进度。
- 检查用户是否具有指定队列的作业提交权限:Hadoop以队列为单位管理作业和资源,每个队列分配有一定量的资源,每个用户属于一个活着多个队列且智能使用所属队列中的资源。
- 检查作业配置的内存使用量是否合理
- 通知TaskScheduler初始化作业:JobTracker收到作业后,并不会马上初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。
JobTracker采用了观察者设计模式将“提交新作业”这一事件告诉TsakTracker
作业初始化过程
初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task 、Map Task 、Reduce Task 、 Cleanup Task
它们的运行时信息都由TaskInProgress类维护,因此,创建这些任务实际上就是创建TaskInProgress对象。
Setup Task : 作业初始化标识性任务。
做简单的初始化任务,该任务完成后,开始运行Map Task,该类型任务被分为Map Setup Task和Reduce Setup Task,每个作业各有一个,运行时分别占用一个Map Slot和Reduce Slot,这两任务功能相同,同一时间只有一个运行。
Map Task:Map阶段处理数据的任务,其数目及对应的处理数据分片由应用程序种的InputFormat确定。
Reduce Task:Reduce阶段处理数据的任务,其数目由用户通过参数mapred.reduce.tasks指定,Hadoop一开始只会调度Map Task,直到Map Task完成数目到达一定比例(默认5%)后,才开始调度Reduce Task
Cleanup Task:作业结束标识性任务,完成作业清理的工作,比如删除作业运行过程种用到的一些临时目录,该任务运行完成后,作业由RUNNING状态变为SUCCEEDED状态。