一起重新开始学大数据-hadoop篇-day47 mapreduce(总) |
MapReduce概述
-
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
-
MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,Map阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据。Reduce阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据【在这先把reduce理解为一个单独的聚合程序即可】。
MapReduce框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现分布式计算,非常简单。
- 这两个函数的形参和返回值都是<key、value>,使用的时候一定要注意构造<k,v>。
MapReduce过程
map任务处理
1.1 框架使用InputFormat类的子类把输入文件(夹)划分为很多InputSplit,默认,每个HDFS的block对应一个InputSplit。通过RecordReader类,把每个InputSplit解析成一个个<k1,v1>。默认,框架对每个InputSplit中的每一行,解析成一个<k1,v1>。
1.2 框架调用Mapper类中的map(…)函数,map函数的形参是<k1,v1>对,输出是<k2,v2>对。一个InputSplit对应一个map task。程序员可以覆盖map函数,实现自己的逻辑。
1.3(假设reduce存在)框架对map输出的<k2,v2>进行分区。不同的分区中的<k2,v2>由不同的reduce task处理。默认只有1个分区。
(假设reduce不存在)框架对map结果直接输出到HDFS中。
1.4 (假设reduce存在)框架对每个分区中的数据,按照k2进行排序、分组。分组指的是相同k2的v2分成一个组。注意:分组不会减少<k2,v2>数量。
1.5 (假设reduce存在,可选)在map节点,框架可以执行reduce归约。
1.6 (假设reduce存在)框架会对map task输出的<k2,v2>写入到linux 的磁盘文件中。至此,整个map阶段结束
shuffer过程
①
1.每个map有一个环形内存缓冲区,用于存储map的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容溢写到(spilt)磁盘的指定目录(mapred.local.dir)下的一个新建文件中。
2.写磁盘前,要partition,sort。如果有combiner,combiner排序后数据。
3.等最后记录写完,合并全部文件为一个分区且排序的文件。
②
1.Reducer通过Http方式得到输出文件的特定分区的数据。
2.排序阶段合并map输出。然后走Reduce阶段。
3.reduce执行完之后,写入到HDFS中。
reduce任务处理
2.1 框架对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程称作shuffle。
2.2 框架对reduce端接收的[map任务输出的]相同分区的<k2,v2>数据进行合并、排序、分组。
2.3 框架调用Reducer类中的reduce方法,reduce方法的形参是<k2,{v2…}>,输出是<k3,v3>。一个<k2,{v2…}>调用一次reduce函数。程序员可以覆盖reduce函数,实现自己的逻辑。
2.4 框架把reduce的输出保存到HDFS中。至此,整个reduce阶段结束。
例子 World count
mapreduce各阶段详解
输入处理类
-
InputFormat
抽象类,只是定义了两个方法。 -
FileInputFormat
FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat处理作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。 -
TextInputFormat
是默认的处理类,处理普通文本
文件
文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value
默认以\n或回车键作为一行记录
InputSplit
-
在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入。
-
当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,会有大量的map task运行,导致效率底下
-
例如:一个1G的文件,会被划分成8个128MB的split,并分配8个map任务处理,而10000个100kb的文件会被10000个map任务处理
-
Map任务的数量
-
一个InputSplit对应一个Map task
-
InputSplit的大小是由Math.max(minSize, Math.min(maxSize,blockSize))决定
-
单节点建议运行10—100个map task
-
map task执行时长不建议低于1分钟,否则效率低
-
特殊:一个输入文件大小为140M,会有几个map task?
-
FileInputFormat类中的getSplits
-
Record reader阶段
record reader通过输入格式将输入split解析成记录。record reader的目的只是将输入数据解析成记录,但不负责解析记录本身。它将数据转化为键/值(key/value)对的形式,key表示偏移量,value表示行文本内容,并传递给mapper处理。通常键是数据在文件中的位置,值是组成记录的数据块。
mapper
在mapper中,用户定义的map代码通过处理record reader解析的每个键/值对来产生0个或多个新的键/值对结果。键/值的选择对MapReduce作业的完成效率来说非常重要。
键是数据在reducer中处理时被分组的依据,值是reducer需要分析的数据。
combiner
combiner是一个可选的本地reducer,可以在map阶段聚合数据。
combiner通过执行用户指定的来自mapper的中间键对map的中间结果做单个map范围内的聚合。
例如:
一个聚合的计数是每个部分计数的总和,用户可以先将每个中间结果取和,再将中间结果的和相加,从而得到最终结果。
在很多情况下,这样可以明显地减少通过网络传输的数据量。在网络上发送一次(hello,3)要比三次(hello,1)节省更多的字节量。
通过combiner可以产生特别大的性能提升,并且没有副作用,因此combiner的应用非常广泛。
partitioner
partitioner的作用是将mapper(如果使用了combiner的话就是combiner)输出的键/值对拆分为分片(shard),每reducer对应一个分片。默认情况下,partitioner先计算目标的散列值(通常为md5值)。然后,通过reducer个数执行取模运算
key.hashCode()%(reducer的个数)
这种方式不仅能够随机地将整个键空间平均分发给每个reducer,同时也能确保不同mapper产生的相同键能被分发至同一个reducer。用户可以定制partitioner的默认行为,并可以使用更高级的模式,如排序。
当然,一般情况下是不需要改写partitioner的。对于每个map任务,其分好区的数据最终会写入本地文件系统,等待其各自的reducer拉取。
混排和排序
该步骤主要是将所有partitioner写入的输出文件拉取到运行reducer的本地机器上,然后将这些数据按照键排序并写到一个较大的数据列表中。排序的目的是将相同键的记录聚合在一起,这样其所对应的值就可以很方便地在reduce任务中进行迭代处理。这个过程完全不可定制,而且是由框架自动处理的。开发人员只能通过自定义Comparator对象来确定键如何排序和分组。
reduce
reducer将已经分好组的数据作为输入,并依次为每个键对应分组执行reduce函数。reduce函数的输入是键以及包含与该键对应的所有值的迭代器。在后文介绍的模式中,我们将看到在这个函数中有很多种处理方法。这些数据可以被聚合、过滤或以多种方式合并。当reduce函数执行完毕后,会将0个或多个键/值对发送到最后的处理步骤——输出格式。和map函数一样,因为reduce函数是业务处理逻辑的核心部分,所以不同作业的reduce函数也是不相同。
输出格式
输出格式获取reduce函数输出的最终键/值对,并通过record write将它写入到输出文件中。每条记录的键和值默认通过tab分隔,不同记录通过换行符分隔。虽然一般情况下可以通过自定义实现非常多的输出格式,但是,不管什么格式,最终的结果都将写到HDFS上。
序列化
通俗的说就是讲一头