MapReduce工作流

MapReduce工作流

 

        如图,在MapReduce阶段,工作流程包括:数据输入分片、Map、Combiner(可选)、Shuffle、Reduce。

 数据分片

        在文件输入Map之前,需要进行数据分片,每一个InputSplit对应一个Mapper,其中分片信息是由List<InputSplit>存储的,包含数据路径、开始地址、长度、host。分片大小一般和HDFS块大小等同,主要是根据max(blockSize, min(minSize, maxSize))计算得到;其中blockSize为HDFS块大小,minSize默认1,maxSize默认Long.MAX_VALUE(9223372036854775807L)。如输入一个260MB的文件,将分为三块,其中最后一块的大小128MB,但在Linux系统中真实的大小依旧是4MB。

        其中分片大小=HDFS块大小的,是因为方便本地执行,如果一个分片占用两个及其以上的块,就需要消耗一定网络IO,相比于本地执行,效率会有所降低。

        针对跨Block或InputSplit的数据,Map会直接跨InputSplit直至读取完成这一行数据。数据读取都是在Buffer完成,当完成Buffer中的数据读取,会自动加载下一批次,并且若未遇到行尾,会继续读区新的一批直至行尾。第二个Map读取时,会查看上一个InputSplit的结尾是否为行尾,如不是,就直接从第二行开始读。

Map阶段

        执行编写好的map函数,而且一般map操作都是本地化操作也就是在数据存储节点上进行。数据完成Map及分区后,会向环形缓冲区写数据,当数据到达80%[float spillper = this.job.getFloat("mapreduce.map.sort.spill.percent", 0.8F);]时,就会溢写到本地磁盘。并且在向磁盘溢出写之前,会对数据创建对应索引及索引快排,同时当溢出文件大于3时,会对小文件合并[this.minSpillsForCombine = this.job.getInt("mapreduce.map.combine.minspills", 3);]。

Combiner阶段

        该阶段是可选的,是一种本地化的Reduce,主要在Map计算出中间结果之前,做一个简单的合并重复key的操作,减少数据冗余,提升传输速率。但该操作的也存在一定的风险,如平均值计算会影响最终的结果。

Shuffle阶段

MapReduce工作流

 

--Map阶段shuffle

        1)每个Map都有一个用来写入输出数据的循环内存缓冲区(默认100MB),当缓冲区的数据量达到80%时,会启动一个后台线程,把最终要传递的数据根据Reduce任务数划分成Partition,在每个Partition中,再根据Key进行排序,最后创建一个溢出文件。

        2)Map任务完成所有的数据处理后,便会有多个溢出文件spill,在完成任务前,溢出文件被合并成一个索引文件和数据文件(放置在运行 Map 任务的 TaskTracker 的本地磁盘上)。

        3)完成这些后,Map任务将会删除所有的临时溢出文件,并告知TaskTracker任务完成,只要有一个Map任务完成,Reduce就会开始Copy。

        其中存在合并/归并按顺序可分为:Combiner(溢出之前value相加)->combine(针对临时小文件spill合并)->归并merge(多个溢出文件spill被合并成一个索引文件和数据文件)

--Reduce阶段shuffle

       1) 启动复制线程(默认5,this.jobConf.getInt("mapreduce.reduce.shuffle.parallelcopies", 5);)获取Map的输出。

        2)将复制的文件先放入内存缓冲区(可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%),如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。

        3)Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候(通过mapred.job.shuffle.merge.percent配置,默认是66%),开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。

        4)在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。

        5)当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的文件,可能存在于内存中,但是默认情况下是位于磁盘中的。

Reducer如何确定拷贝哪些数据

        Map端进行partition的时候已经指定了每个Reducer要处理的数据,所以只需要拷贝对应的数据即可,至少处理一个partition。

Reduce阶段

        经过shuffle阶段处理后的数据,是分区并排序的index索引文件,ReduceTask从文件读取一个key传递给reduce方法并传一个value迭代器,通过迭代器判断下一个key值,相同返回value,不同调转下一个key。ReduceTask完成后,将分区文件归并排序生成大文件输出到HDFS。

上一篇:Hadoop面试题目


下一篇:【单点】每日突破,MapReduce调优篇