一、MapReduce的概念
MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框就是mapreduce,两者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程。
1.MapReduce编程模型
MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是"任务的分解与结果的汇总"。
在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。
在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
需要注意的是,用MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
2.MapReduce处理过程
在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,Hadoop函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
二、MapReduce工作原理
1.用故事说明什么是MapReduce
我问妻子:“你真的想要弄懂什么是MapReduce?” 她很坚定的回答说“是的”。 因此我问道: 我: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试) 妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。
妻子: 但这和MapReduce有什么关系?
我: 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce.
妻子: 好吧。
我:现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?
妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。
我: 没错,让我们把MapReduce的概念应用到食谱上。Map和Reduce其实是两种操作,我来给你详细讲解下。
Map(映射): 把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。
Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。
Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。
妻子: 所以,这就是MapReduce?
我: 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。
妻子: 分布式计算? 那是什么?请给我解释下吧。
我: 没问题。
我: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢?
妻子: 我会找一个能为我大量提供原料的供应商。 我:是的..就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。
妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。
我:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。
这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。
妻子:但是我怎么会制造出不同种类的番茄酱呢?
我:现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的 map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。
2.MapReduce运行原理
一个Map简要的图,来说明MapReduce计算框架有哪些构成,Map --》Shuffle(包括sort copy merge)-->Reduce -->输出;Split 由我们的HDFS完成;
下面是两个图,一个是Hadoop官方提供的图,另一个是自己画的,来详细说明三个阶段干的工作和工作流程:
Map
.每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。
.在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combia操作,这样做的目的是让尽可能少的数据写入到磁盘。
.当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。
.将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。
Shuffle
.每个Map任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。
.在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直道spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
.每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。
.当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,并告知TaskTracker 任务已完成。Reducers 通过HTTP 来获取对应的数据。用来传输partitions 数据的工作线程个数由tasktracker.http.threads 控制,这个设定是针对每一个TaskTracker 的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
Reduce
.Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。
.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。
.合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
3.WordCount例子跑流程
假设有如下的两个文本文件来运行WorkCount程序:
Hello World Bye World
Hello Hadoop GoodBye Hadoop
map数据输入
Hadoop针对文本文件缺省使用LineRecordReader类来实现读取,一行一个key/value对,key取偏移量,value为行内容。
#如下是map1的输入数据:
Key1 Value1
0 Hello World Bye World #如下是map2的输入数据:
Key1 Value1
0 Hello Hadoop GoodBye Hadoop
map输出/Combine输入
#如下是map1的输出结果
Key2 Value2
Hello 1
World 1
Bye 1
World 1 #如下是map2的输出结果
Key2 Value2
Hello 1
Hadoop 1
GoodBye 1
Hadoop 1
Combine输出
Combiner类实现将相同key的值合并起来,它也是一个Reducer的实现。
#如下是combine1的输出
Key2 Value2
Hello 1
World 2
Bye 1 #如下是combine2的输出
Key2 Value2
Hello 1
Hadoop 2
GoodBye 1
Combiner视业务情况来用,减少MAP->REDUCE的数据传输,提高shuffle速度,就是在map中再做一次reduce操作。combiner使用的合适,可以在满足业务的情况下提升job的速度,如果不合适,则将导致输出的结果不正确。
对于wordcount来说,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。
reduce输出
Reducer类实现将相同key的值合并起来。
#如下是reduce的输出
Key2 Value2
Hello 2
World 2
Bye 1
Hadoop 2
GoodBye 1
即实现了WordCount的处理。
流程分析:
1.在客户端启动一个作业。
2.向JobTracker请求一个Job ID。
3.将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。
4.JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度(这里是不是很像微机中的进程调度呢,呵呵),当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行。
对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里需要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。
5.TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。
以上是在客户端、JobTracker、TaskTracker的层次来分析MapReduce的工作原理的。
MapReduce 角色
•Client :作业提交发起者。
•JobTracker: 初始化作业,分配作业,与TaskTracker通信,协调整个作业。
•TaskTracker:保持JobTracker通信,在分配的数据片段上执行MapReduce任务。
提交作业
•在作业提交之前,需要对作业进行配置
•程序代码,主要是自己书写的MapReduce程序。
•输入输出路径
•其他配置,如输出压缩等。
•配置完成后,通过JobClinet来提交
作业的初始化
•客户端提交完成后,JobTracker会将作业加入队列,然后进行调度,默认的调度方法是FIFO调试方式。
任务的分配
•TaskTracker和JobTracker之间的通信与任务的分配是通过心跳机制完成的。
•TaskTracker会主动向JobTracker询问是否有作业要做,如果自己可以做,那么就会申请到作业任务,这个任务可以使Map也可能是Reduce任务。
任务的执行
•申请到任务后,TaskTracker会做如下事情:
•拷贝代码到本地
•拷贝任务的信息到本地
•启动JVM运行任务
状态与任务的更新
•任务在运行过程中,首先会将自己的状态汇报给TaskTracker,然后由TaskTracker汇总告之JobTracker。
•任务进度是通过计数器来实现的。
作业的完成
•JobTracker是在接受到最后一个任务运行完成后,才会将任务标志为成功。
•此时会做删除中间结果等善后处理工作。
第二部分:错误处理
任务失败
•MapReduce在设计之出,就假象任务会失败,所以做了很多工作,来保证容错。
•一种情况: 子任务失败
•另一种情况:子任务的JVM突然退出
•任务的挂起
TaskTracker失败
•TaskTracker崩溃后会停止向Jobtracker发送心跳信息。
•Jobtracker会将该TaskTracker从等待的任务池中移除。并将该TaskTracker上的任务,移动到其他地方去重新运行。
•TaskTracker可以被JobTracker放入到黑名单,即使它没有失败。
JobTracker失败
•单点故障,Hadoop新的0.23版本解决了这个问题。
第三部分:作业调度
FIFO
•Hadoop 中默认的调度器,它先按照作业的优先级高低,再按照到达时间的先后选 择被执行的作业
公平调度器
•为任务分配资源的方法,其目的是随着时间的推移,让提交的作业获取等量的集 群共享资源,让用户公平地共享集群。具体做法是:当集群上只有一个任务在运行 时,它将使用整个集群,当有其他作业提交时,系统会将TaskTracker节点空间的时 间片分配给这些新的作业,并保证每个任务都得到大概等量的CPU时间。
容量调度器
•支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为 了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交 的作业所 占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值 最小的队 列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择 ,同时考虑用户资源量限制和内存限制。
配置公平调度器
•修改mapred-stie.xml 加入如下内容
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>
<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>/opt/hadoop/conf/allocations.xml</value>
</property>
<property>
<name>mapred.fairscheduler.poolnameproperty</name>
<value>pool.name</value>
</property>
•在 Hadoop conf 下创建 allocations.xml内容为
<?xml version="1.0"?>
<alloctions>
</alloctions>
样例:
<pool name="sample_pool">
<minMaps>5</minMaps>
<minReduces>5</minReduces>
<weight>2.0</weight>
</pool>
<user name="sample_user">
<maxRunningJobs>6</maxRunningJobs>
</user>
<userMaxJobsDefault>3</userMaxJobsDefault>
• 重启 JobTracker
•访问 http://jobTracker:50030/scheduler , 查看 FariScheduler 的 UI
• 提交任务测试
第四部分:Shuffle与排序
•Mapreduce 的 map 结束后,把数据重新组织,作为 reduce 阶段的输入,该过程称 之为 shuffle--- 洗牌。
•而数据在 Map 与 Reduce 端都会做排序。
Map
• Map 的输出是由collector控制的
• 我们从collect函数入手
Reduce
•reduce的Shuffle过程,分成三个阶段:复制Map输出、排序合并、reduce处理。
•主要代码在reduce的 run函数
Shuffle优化
•首先Hadoop的Shuffle在某些情况并不是最优的,例如,如果需要对2集合合并,那么其实排序操作时不需要的。
•我们可以通过调整参数来优化Shuffle
•Map端
•io.sort.mb
•Reduce端
•mapred.job.reduce.input.buffer.percent
第五部分:任务的执行时的一些特有的概念
推测式执行
•每一道作业的任务都有运行时间,而由于机器的异构性,可能会会造成某些任务会比所有任务的平均运行时间要慢很多。
•这时MapReduce会尝试在其他机器上重启慢的任务。为了是任务快速运行完成。
•该属性默认是启用的。
JVM重用
•启动JVM是一个比较耗时的工作,所以在MapReduce中有JVM重用的机制。
•条件是统一个作业的任务。
•可以通过mapred.job.reuse.jvm.num.tasks定义重用次数,如果属性是-1那么为无限制。
跳过坏记录
•数据的一些记录不符合规范,处理时抛出异常,MapReduce可以讲次记录标为坏记录。重启任务时会跳过该记录。
•默认情况下该属性是关闭的。
任务执行环境
•Hadoop为Map与Reduce任务提供运行环境。
•如:Map可以知道自己的处理的文件
•问题:多个任务可能会同时写一个文件
•解决办法:将输出写到任务的临时文件夹。目录为:{mapred.out. put.dir}/temp/${mapred.task.id}
第六部分:MapReduce的类型与格式
类型
•MapReduce的类型 使用键值对作为输入类型(key,value)•输入输出的数据类型是通过输入输出的格式进行设定的。
输入格式
•输入分片与记录
•文件输入
•文本输入
•二进制输入
•多文件输入
•数据库格式的输入
输入分片与记录
•Hadoop通过InputSplit表示分片。
•一个分片并不是数据本身,而是对分片数据的引用。
•InputFormat接口负责生成分片。
文件输入
•实现类:FileInputFormat
•通过文件作为输入源的基类。
•四个方法:
•addInputPath()
•addInputPaths()
•setInputPath()
•setInputPaths()
•FileInputFormat会按HDFS块的大小来分割文件
•避免分割
•继承FileInputFormat 重载isSplitable()
•return false
文本输入
•实现类:TextInputFormat
•TextInputFormat 是默认的输入格式。
•包括:
•KeyValueTextInputFormat
•NLineInputFormat
•XML
•输入分片与HDFS块之间的关系
•TextInputFormat的某一条记录可能跨块存在
二进制输入
•实现类:SequenceFileInputFormat
•处理二进制数据
•包括:
•SequenceFileAsTextInputFormat
•SequenceFileAsBinaryInputFormat
多文件输入
•实现类:MultipleInputs
•处理多种文件输入
•包括:
•addInputPath
数据库输入
•实现类:DBInputFormat
•注意使用,因为连接过多,数据库无法承受。
输出格式
•文本输出
•二进制输出
•多文件输出
•数据库格式的输出
文本输出
•实现类:TextOutputFormat
•默认的输出方式
• 以 "key \t value" 的方式输出
二进制输出
•基类: SequenceFileOutputFormat
•实现类: SequenceFileAsTextOutputFormat
MapFileOutputFormat
SequenceFileAsBinaryOutputFormat
多文件输出
•MutipleOutputFormat•MutipleOutputs
•两者的不同在于MutipleOutputs可以产生不同类型的输出