MepReduce-开启大数据计算之门

Hadoop MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。早期的MapReduce(MR)框架简单明了,JobTracker作为MR框架的集中处理点,随着分布式系统集群的规模和其工作负荷的增长,显得力不从心:

1. JobTracker 存在单点故障。

2. JobTracker 任务重,资源消耗多,当MR任务非常多的时候,会造成很大的内存开销,增加了 JobTracker fail 的风险,业界总结出旧MR框架只能支持 4000节点主机的上限。

3. 在TaskTracker端,以MR任务的数目表示资源过于简单,没有考虑CPU和 内存的占用情况,如果两个大内存消耗的任务被调度到了同一个TaskTracker,很容易出现 OOM。

4. TaskTracker端,把资源强制划为 Map task slot和Reduce task slot, 当系统中只有Map task或只有Reduce task时,会造成资源浪费。

5. 源代码难读,bug 修复和版本维护的难度大。

6. 不易维护,MR框架会强制进行系统级别的升级更新,让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新版本而浪费大量时间。

MepReduce-开启大数据计算之门

    

    为从根本上解决旧 MR框架的性能瓶颈,促进 Hadoop的长远发展,从0.23.0版本开始,Hadoop 的MR框架完全重构,新的MR基于YARN 进行数据处理。

MepReduce-开启大数据计算之门

    重构根本的思想是将JobTracker的资源管理和任务调度&监控功能分离成单独的组件,ResourceManager 和ApplicationMaster 。

ResourceManager实现了以前JobTracker的一些功能,控制整个集群并管理资源的分配,包含两个组件:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。

ApplicationMaster承担了TaskTracker的功能,主要职责为向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。ApplicationMaster并不是一个物理节点,用户提交的每一个任务中均包含一个ApplicationMaster,可将其视为用户在创建业务代码时实现的任务框架。

 

NodeManager负责管理YARN 集群中的每个节点,提供针对集群中每个节点的服务。它是应用程序的容器,监控应用程序的资源使用情况 (CPU&内存&硬盘&网络)并且向调度器汇报。

MepReduce-开启大数据计算之门

   YARN调度MR任务的流程如下图所示:

MepReduce-开启大数据计算之门

    

    一个完整的MR任务大体分为Map和Reduce两个阶段,部分任务可能会增加Combiner阶段,MR任务的数据在执行中会被封装成键值对(key, value)的格式,且数据均需实现框架提供的序列化(*Writable)接口。下例以WorldCount为例详述MR具体过程。

MepReduce-开启大数据计算之门

    Map阶段完成对数据的拆分,具体数据处理方式由业务逻辑决定。上例中的Map将按行读入的数据进行分词计数,并将结果封装成<k2, v2>。

    Reduce阶段实现数据的整合过程,对Map的输出按业务整合。上例中主要实现对v3的求和,将结果封装成<k4, v4>,其中<k3, v3>需与<k2, v2>的数据类型保持一致。

 

     在Hadoop这样的集群环境中,大部分Map任务与Reduce任务的执行是在不同的节点上,数据在传输的过程中存在大量的IO操作(本地IO及网络传输),必然要对数据进行序列化和反序列化。序列化 (Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。MR基础的序列化数据类型有IntWritable、Text等,还为用户提供了Writable序列化接口,如果一个类实现了Writable接口,该类的对象可以作为MR任务中的key和value。

    集群运行大量的MR任务时,网络IO和本地IO操作会成为性能瓶颈。为提高性能,MR引入了Shuffle过程,期望实现以下目标: 

* 保证数据完整地从Map传递到Reduce;

* 在跨节点拉取数据时,尽可能减少对带宽的消耗;

* 减少磁盘IO对任务执行的影响。

    结合下图,重点讨论MR的Shuffle(洗牌)过程,包含partition(分区), sort(排序)与merge(合并)等操作。

1. 集群中Map与Reduce是多对多的关系,Map的处理结果交由哪个Reduce处理,通过分区规则确定。

    MR提供Partitioner接口,根据key或value及Reduce的数量来决定当前的输出数据最终应该交由哪个Reduce处理。默认规则是先对key hash,再以Reduce的数量取模,根据取模结果确定Reduce。默认的取模方式是为了实现Reduce负载均衡,用户可自定义分区规则。

2. 确定Reduce后,Mapper的结果会批量写入内存缓冲区(默认100M)。当缓冲区的数据大小超过设定比例(默认为80M)时,数据会从内存写入磁盘,写入的过程称为Spill(溢写),溢写由单独的线程完成,溢写的同时会对数据进行排序。默认按key值进行升序排列,字母按照字典顺序(A-Za-z)排列,用户通过WirtableComparable接口实现自定义排序。

 3.  同一节点的Map输出的结果有可能会送到不同的Reduce,为了减少与分区相关的索引记录,MR会对溢写到磁盘的文件及内存中的数据进行合并。如果用户设置了Combiner(可理解特殊的Reducer任务),在合并时会执行Combiner操作,合并相同的key。

    Combiner需谨慎使用,从WorldCount例子可知,Map的输出数据类型与Reduce的数据类型要一一对应,所以Combiner的输入输出数据类型不能改变;同时类似取平均值的场景不能使用Combiner,否则影响最终结果。   

    以上为Map端的Shuffle,下面分析Reduce的Shuffle过程。

MepReduce-开启大数据计算之门

1. Reduce不断从执行完Map任务的节点中获取Map结果数据。

2. Merge阶段,排序在此过程中进行,此处也可配置Combiner。Merge有三种形式:内存到内存、内存到磁盘(溢写)、磁盘到磁盘,默认不启用内存到内存的方式。溢写的过程与Map端类似,磁盘到磁盘的合并发生在所有Map数据处理完成后,生成最终的结果文件交给Reduce处理。

    Reduce中内存缓冲区大小要比Map端更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以大部分的内存都给Shuffle使用。 

3. Merge最终生成的结果文件可能存在于磁盘上,也可能存在于内存中,默认存放于磁盘中。当Reducer的输入文件生成时,整个Shuffle才最终结束。最后Reducer执行,输出结果到HDFS。

    MR的核心功能在于Shuffle过程,默认由框架提供。用户开发MR任务时,若无需自定义排序及分区规则,仅需编写Mapper和Reducer的业务逻辑即可实现分布式高并发的大数据计算。

上一篇:悬线法DP总结


下一篇:基于Hadoop的数据仓库Hive 基础知识