MapReduce V1:MapTask执行流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
在文章《MapReduce V1:TaskTracker设计要点概要分析》中我们已经了解了org.apache.hadoop.mapred.Child启动的基本流程,在Child VM启动的过程中会运行MapTask,实际是运行用户编写的MapReduce程序中的map方法中的处理逻辑,我们首先看一下,在Child类中,Child基于TaskUmbilicalProtocol协议与TaskTracker通信,获取到该Child VM需要加载的Task相关数据,包括Task本身,代码如下所示:

01 final TaskUmbilicalProtocol umbilical =
02 taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
03 @Override
04 public TaskUmbilicalProtocol run() throws Exception { // 建立Child到TaskTracker的RPC连接
05 return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
06 TaskUmbilicalProtocol.versionID,
07 address,
08 defaultConf);
09 }
10 });
11 ... ...
12 JvmContext context = new JvmContext(jvmId, pid); // 根据启动Child VM命令行传递的参数构造一个JvmContext对象
13 ... ...
14 JvmTask myTask = umbilical.getTask(context); // 基于umbilical获取到一个JvmTask
15 ... ...
16 task = myTask.getTask(); // 通过JvmTask获取到MapTask或ReduceTask

上面代码中,JvmTask中就包含了一个Task,也就是task,它可能是MapTask或ReduceTask。
看一下在org.apache.hadoop.mapred.Child中运行Task的基本代码,如下所示:

01 // Create a final reference to the task for the doAs block
02 final Task taskFinal = task;
03 childUGI.doAs(new PrivilegedExceptionAction<Object>() {
04 @Override
05 public Object run() throws Exception {
06 try {
07 // use job-specified working directory
08 FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
09 taskFinal.run(job, umbilical); // 这行是核心代码,运行实际的MapTask或ReduceTask
10 } finally {
11 TaskLog.syncLogs
12 (logLocation, taskid, isCleanup, logIsSegmented(job));
13 TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);
14 trunc.truncateLogs(new JVMInfo(
15 TaskLog.getAttemptDir(taskFinal.getTaskID(),
16 taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));
17 }
18
19 return null;
20 }
21 });

我们关注执行MapTask,上面,通过调用MapTask的run方法,来实际启动MapTask的运行。

MapTask整体执行流程

MapTask运行的整体流程,如下图所示:

MapReduce V1:MapTask执行流程分析
上面流程比较直观,我们结合MapTask的run方法的代码,来进行分析,代码如下所示:

01 @Override
02 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
03 throws IOException, ClassNotFoundException, InterruptedException {
04 this.umbilical = umbilical;
05
06 // start thread that will handle communication with parent
07 TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); // 创建TaskReporter对象
08 reporter.startCommunicationThread();
09 boolean useNewApi = job.getUseNewMapper();
10 initialize(job, getJobID(), reporter, useNewApi); // 初始化MapTask
11
12 // check if it is a cleanupJobTask
13 if (jobCleanup) {
14 runJobCleanupTask(umbilical, reporter); // 运行JobCleanupTask
15 return;
16 }
17 if (jobSetup) {
18 runJobSetupTask(umbilical, reporter); // 运行JobSetupTask
19 return;
20 }
21 if (taskCleanup) {
22 runTaskCleanupTask(umbilical, reporter); // 运行TaskCleanupTask
23 return;
24 }
25
26 if (useNewApi) {
27 runNewMapper(job, splitMetaInfo, umbilical, reporter); // 运行MapTask
28 } else {
29 runOldMapper(job, splitMetaInfo, umbilical, reporter);
30 }
31 done(umbilical, reporter); // Task运行完成
32 }

上面代码中,run方法的参数TaskUmbilicalProtocol umbilical表示一个RPC代理对象,通过该对象可以与TaskTracker进行通信,从而在Task运行过程中,能够将Task的运行进度、状态信息汇报给TaskTracker。
通过上面看出,启动一个MapTask,可能运行的是JobCleanupTask、JobSetupTask、TaskCleanupTask、Mapper四种Task之中的一种,运行每种Task都会向TaskTracker进行汇报。关于一个MapTask如何被划分的,可以参考JobTracker端在JobInProgress中initTasks()方法。
下面,我们根据MapTask中run方法中的处理流程,分为如下几个子流程,进行详细分析:

初始化Task分析

这里,主要分析在MapTask的run方法中,调用initialize方法的初始化逻辑。先看在调用initialize方法之前,首先创建了一个TaskReporter线程对象,该对象又是基于TaskUmbilicalProtocol umbilical来实现将Task运行状态汇报给TaskTracker。在initialize方法中的初始化流程如下所示:

  1. 根据JobConf job与JobID id,以及TaskReporter,创建一个JobContext对象
  2. 创建一个TaskAttemptContext对象
  3. 如果Task状态为TaskStatus.State.UNASSIGNED,修改为TaskStatus.State.RUNNING
  4. 根据JobConf创建OutputFormat,以及OutputCommitter
  5. 为该Task创建Job的输出目录等内容
  6. 初始化ResourceCalculatorPlugin,用来计算Task运行过程中对节点资源的使用情况

运行JobCleanupTask

执行JobCleanupTask,主要是清理Job运行过程中产生的数据,因为该Task可能上次运行过一次,但是失败,为了下一次重新运行,需要将之前失败Task的数据清理掉。即使Job运行成功,也需要清理MapTask执行后输出的中间结果数据,具体流程,如下图所示:

MapReduce V1:MapTask执行流程分析
上面的序列图比较详细,将Task运行过程中与TaskTracker之间进行通信都描述出来,我们总结如下几个要点:

  • 运行JobCleanupTask首先将当前Task运行阶段设置为CLEANUP,并向TaskTracker汇报状态变更
  • 如果Job的状态是JobStatus.State.FAILED,则删除在该节点上运行Task所产生的临时数据
  • 如果Job的状态是JobStatus.State.KILLED,同样删除临时数据,并在该Job对应的目录下创建_SUCCESS文件,标识Job成功
  • 最后,向TaskTracker汇报状态,更新相关TIP数据结构状态,并释放锁占用的资源,以供其他Task运行所使用

运行JobSetupTask

运行JobSetupTask,主要是初始化Job对应的临时目录,为后续运行Task做准备,具体处理流程,如下图所示:

MapReduce V1:MapTask执行流程分析
在创建运行Job的基本临时目录以后,也需要与TaskTracker通信,汇报该Task的运行状态。

运行TaskCleanupTask

TaskCleanupTask与JobCleanupTask类似,主要是清理Task运行过程中产生的一些临时目录和文件,具体流程如下图所示:

MapReduce V1:MapTask执行流程分析

运行MapTask

MapTask执行的核心逻辑在runNewMapper方法中,该方法中对应的处理流程,如下图所示:

MapReduce V1:MapTask执行流程分析
方法runNewMapper的声明,如下所示:

1 private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,
2 final TaskSplitIndex splitIndex,
3 final TaskUmbilicalProtocol umbilical,
4 TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException

其中,JobConf job包含了该Job的配置信息,TaskSplitIndex splitIndex包含了该MapTask所处理的InputSplit的信息(包括splitLocation和startOffset),TaskUmbilicalProtocol umbilical是与TaskTracker通信的代理对象,TaskReporter reporter是一个与TaskTracker通信的线程。
上面的序列图,所描述的具体处理流程,如下所示:

  1. 创建TaskAttemptContext对象
  2. 通过反射,创建Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>对象
  3. 通过反射,创建InputFormat<INKEY,INVALUE>对象
  4. 根据TaskSplitIndex splitIndex ,创建InputSplit对象
  5. 创建RecordReader<INKEY,INVALUE>对象,用来读取输入的InputSplit对应的文件
  6. 创建RecordWriter<K, V>对象,用来将处理后的数据写入文件系统
  7. 创建Mapper.Context对象,可以在编写MapReduce程序中,实现Mapper时使用
  8. 初始化RecordReader<INKEY,INVALUE>对象
  9. 执行Mapper的run方法,调用用户编写的MapReduce程序的Mapper中的处理逻辑,内部循环调用map方法
  10. 回收资源,关闭相关的流对象

下面,我们详细看一下,Mapper处理过程中相关的要点:

  • Mapper.Context结构

Mapper.Context是Mapper类的一个内部类,它包含了运行一个MapTask过程中所需要的所有上下文信息,该类的继承层次结构,如下图所示:

MapReduce V1:MapTask执行流程分析
可以看出,一个Mapper对应的执行上下文信息,继承了该Mapper对应TaskAttempt的上下文信息,再向上继承了Job的上下文信息,一个Job包含的配置信息都可以被一个Mapper读取到。
MapperContext中包含如下信息:

1 private RecordReader<KEYIN,VALUEIN> reader;
2 private InputSplit split;

通过使用一个RecordReader,能够读取InputSplit对应的HDFS上的Block文件数据。
TaskInputOutputContext中包含如下信息:

1 private RecordWriter<KEYOUT,VALUEOUT> output;
2 private StatusReporter reporter;
3 private OutputCommitter committer;

可见,在该层能够实现将Task运行的统计,通过StatusReporter以Counter的形式收集,并提供进度(Progress)获取接口。同时,使用RecordWriter将Mapper的输出写入到文件系统。
OutputCommitter是一个比较重要的对象,该抽象类代码,如下所示:

01 public abstract class OutputCommitter {
02
03 public abstract void setupJob(JobContext jobContext) throws IOException;
04
05 public void commitJob(JobContext jobContext) throws IOException {
06 cleanupJob(jobContext);
07 }
08
09 @Deprecated
10 public void cleanupJob(JobContext context) throws IOException { }
11
12 public void abortJob(JobContext jobContext, JobStatus.State state) throwsIOException {
13 cleanupJob(jobContext);
14 }
15
16 public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;
17
18 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throwsIOException;
19
20 public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;
21
22 public abstract void abortTask(TaskAttemptContext taskContext) throws IOException;
23 }

可以参考实现类FileOutputCommitter,它主要负责,在Job运行过程中,管理Task执行过程中对应的文件或目录的信息,如开始运行之前创建目录,运行完成后将临时有用的文件移动到供Job共享的目录下,也会执行无用临时文件的清理。
TaskAttemptContext中主要是有关一个TaskAttempt的相关信息,包括TaskAttemptID及其状态信息。
JobContext主要包括一个Job的相关配置信息,如MapReduce Job的输入输出规格(输入输出KV格式)、Mapper类信息、输入文件格式信息,还有JobID信息。

  • Mapper处理流程

通过调用mapper.run(mapperContext);进入Mapper的执行流程,我们首先看一下,Mapper提供的接口的声明,代码如下所示:

01 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
02
03 public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { // Mapper.Context是Mapper运行时上下文对象,包含了运行MapTask所需要的基本信息
04 public Context(Configuration conf, TaskAttemptID taskid,
05 RecordReader<KEYIN,VALUEIN> reader,
06 RecordWriter<KEYOUT,VALUEOUT> writer,
07 OutputCommitter committer,
08 StatusReporter reporter,
09 InputSplit split) throws IOException, InterruptedException {
10 super(conf, taskid, reader, writer, committer, reporter, split);
11 }
12 }
13
14 /**
15 * Called once at the beginning of the task.
16 */
17 protected void setup(Context context) throws IOException, InterruptedException { // 执行map方法之前,进行初始化工作
18 // NOTHING
19 }
20
21 /**
22 * Called once for each key/value pair in the input split. Most applications
23 * should override this, but the default is the identity function.
24 */
25 @SuppressWarnings("unchecked")
26 protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { // 用户实际实现的map处理逻辑
27 context.write((KEYOUT) key, (VALUEOUT) value);
28 }
29
30 /**
31 * Called once at the end of the task.
32 */
33 protected void cleanup(Context context ) throws IOException, InterruptedException {// 执行map完成之后,进行清理
34 // NOTHING
35 }
36
37 /**
38 * Expert users can override this method for more complete control over the
39 * execution of the Mapper.
40 * @param context
41 * @throws IOException
42 */
43 public void run(Context context) throws IOException, InterruptedException { // 该方法是程序的驱动入库,循环调用map方法处理每一个键值对数据
44 setup(context);
45 try {
46 while (context.nextKeyValue()) {
47 map(context.getCurrentKey(), context.getCurrentValue(), context);
48 }
49 } finally {
50 cleanup(context);
51 }
52 }
53 }

上面run方法中,输入的InputSplit对应的每一个记录,循环调用map方法,map方法中是用户实现的MapReduce的Mapper中的代码。

Mapper输出流程分析

在runNewMapper方法中可以看到,如果Job有0个ReduceTask,则Mapper将结果直接写入到HDFS文件;而如果Job有大于0个ReduceTask,则Mapper将结果输出到本地,是中间结果。代码如下所示:

1 if (job.getNumReduceTasks() == 0) {
2 output =
3 new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
4 } else {
5 output = new NewOutputCollector(taskContext, job, umbilical, reporter);
6 }

上面,NewDirectOutputCollector实际上将Mapper的输出直接写入到HDFS文件;而NewOutputCollector将Mapper输出写入到本地文件,我们看一下NewOutputCollector是如何实现Mapper输出落地的,它的构造方法,代码如下所示:

01 NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
02 JobConf job,
03 TaskUmbilicalProtocol umbilical,
04 TaskReporter reporter) throws IOException, ClassNotFoundException {
05 collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); //
06 partitions = jobContext.getNumReduceTasks(); // 根据ReduceTask个数计算Mapper输出分区个数
07 if (partitions > 0) { // 分区个数大于0,创建一个Partitioner对象
08 partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
09 } else {
10 partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
11 @Override
12 public int getPartition(K key, V value, int numPartitions) {
13 return -1;
14 }
15 };
16 }
17 }

可以看出,上面的collector里面的逻辑非常核心,它是MapOutputBuffer类实例,该类实现也是相对复杂的。接着,我们从如下几个方面详细分析:

  • 使用Buffer存储键值对数据

Mapper输出后,会首先将键值对数据写入到一个Buffer中,通过其它的几个Buffer来跟踪键值对的在Buffer中偏移位置信息,以及键值对所属的分区(Partition)。我们看一下NewOutputCollector调用write方法输出键值对数据的逻辑:

1 @Override
2 public void write(K key, V value) throws IOException, InterruptedException {
3 collector.collect(key, value, partitioner.getPartition(key, value, partitions));
4 }

通过partitioner计算一个输出键值对所属的分区,然后调用MapOutputBuffer的collect方法实现Mapper中间结果数据输出。下面,我们看MapOutputBuffer涉及到的一些存储结构,其中包含3个buffer,如下代码所示:

1 private final int[] kvoffsets; // indices into kvindices
2 private final int[] kvindices; // partition, k/v offsets into kvbuffer
3 private byte[] kvbuffer;

上面的数组结构,kvbuffer用来存储Map输出的键值对数据,kvoffsets用来存储Map输出的键值对数据在kvbuffer中的偏移位置,kvindices用来记录一个键值对对应的分区、键开始、值开始位置的数据结构,具体我们会在后面详细说明。
首先,看一下用来定义和限制这些buffer的一些基本默认配置参数:

01 io.sort.mb = 100MB
02 io.sort.record.percent = 0.05
03 io.sort.spill.percent = 0.8
04
05 sortmb = 100MB
06 rEcper = 0.05
07 spillper = 0.8
08
09 RECSIZE = 16
10 ACCTSIZE = 3

基于这些配置的参数,计算得到一些buffer的大小及其限制配置,为了直观我们通过使用MB来表示大小,如下所示:

1 maxMemUsage = sortmb <<20 = 104857600 = 100MB
2 recordCapacity = maxMemUsage * recper = 104857600 * 0.05 = 5242880 = 5MB
3 recordCapacity -= recordCapacity % RECSIZE = 5242880 - 5242880 % 16 = 5242880 = 5MB
4 maxMemUsage – recordCapacity = 104857600 - 5242880 = 99614720 = 95MB
5
6 softBufferLimit = ((maxMemUsage – recordCapacity) * spillper) = 99614720 * 0.8 = 79691776 = 76MB
7 softRecordLimit = (recordCapacity * spillper) = 5242880 * 0.8 = 4194304 = 4MB

现在,我们看一下这些buffer的结构即相关配置参数,如下图所示:

MapReduce V1:MapTask执行流程分析
Mapper分别读取输入Split的每一行数据,然后输出键值对数据,这些键值对数据在上面讲述到的Buffer中的存储细节,如下图所示:
MapReduce V1:MapTask执行流程分析
上图中,输出了2个键值对数据,首先kvbuffer中会顺序存储每一个键值对数据,同时,kvoffsets会记录每一个键值对的顺序编号,或者说,在kvbuffer中的偏移位置,在kvindices中包含了每一个键值对的三个属性信息:所属分区(kvi partition)、键起始偏移量(ki start)、值起始偏移量(vi start)。

  • 将Buffer中键值对数据写入本地磁盘文件

当内存中的键值对数据的存储量在上述Buffer中达到一定限制(通过配置io.sort.spill.percent决定,默认是大约95MB的Buffer存储容量达到80%,即大约76MB时会触发将Buffer中数据写入磁盘文件)时,就会通过SpillThread线程,将Buffer中的数据写入本地磁盘文件中,可以在SpillThread类中的sortAndSpill方法中看到具体实现。在将Buffer中的键值对数据写入磁盘之前,先进行一次内存排序,排序的规则是:MapOutputBuffer内部有3个Buffer,排序是对键值对偏移位置的Buffer kvoffsets进行排序,保证每一个键值对所属的分区(Partition)按照升序排序,然后再保证每个分区(Partition)中的键值对按照键进行排序,这样最后得到的kvoffsets中的键值对数据就是按照分区进行分组,并且每个分组中是按照键排过序的。
如果用户编写的MapReduce程序,指定了Combiner,则再排序之后,写入磁盘文件之前,调用Combiner对数据进行合并,具体可以参考CombinerRunner实现类。
经过几次填充Buffer,达到配置写入容量值会写入文件,最终得到多个分区的、按照键(Key)排序的文件,在所有数据都输出结束后,会对上述生成的多个文件进行合并,合并成一个大文件,该大文件也是经过分区的、排序的文件。在最后的合并阶段,也会检查用户是否配置了Combiner,如果是则会对每个分区的数据做一个局部化简合并,输出到最终的一个文件中,并将多个小的分区文件删除。

上一篇:工信部放大招:将统一 Android 消息推送标准


下一篇:java B2B2C电子商务平台分析之十一------配置中心和消息总线