目录
一、定义
1.大数据时代
1)、三次信息化浪潮时间点以及形式
第一次:数字化 1980年左右,个人计算机诞生,社会生产力提高。
第二次:网络化 1995年左右,互联网流行,门户网站诞生,信息量急剧增加。
第三次:智能化 2010之后,物联网,云计算,大数据的产生,数据爆发
数字化奠定基础,实现数据资源的获取和积累;网络化构造平台,促进数据资源的流通和汇聚;智能化展现能力,通过多源数据的融合分析呈现信息应用的类人智能,帮助人类更好认知事物和解决问题。
2)、人类社会数据产生方式经历阶段
阶段一:运营试系统阶段 数据被动存储在数据库中
阶段二:用户原创内容阶段 web2.0 主动存储
阶段三:感知系统阶段 感知系统广应用
截止到2012年,数据量已经从TB(1024GB=1TB)级别跃升到PB(1024TB=1PB)、EB(1024PB=1EB)乃至ZB(1024EB=1ZB)级别。国际数据公司(IDC)的研究结果表明,2008年全球产生的数据量为0.49ZB,2009年的数据量为0.8ZB,2010年增长为1.2ZB,2011年的数量更是高达1.82ZB,相当于全球每人产生200GB以上的数据。而到2012年为止,人类生产的所有印刷材料的数据量是200PB,全人类历史上说过的所有话的数据量大约是5EB。IBM的研究称,整个人类文明所获得的全部数据中,有90%是过去两年内产生的。而到了2020年,全世界所产生的数据规模将达到今天的44倍。
3)、大数据的特点
1:数据量大:产生的数据多,数据集大
2:数据种类多:结构化数据,半结构化数据,非结构化数据
3:处理速度快:产生数据速度快,要快速处理数据,防止数据失去实时性
4:价值密度低:数据中只有极少数据有实际意义
4)、大数据的精髓
全样而非抽样,效率而非精确,相关而非因果
大数据带给我们的三个颠覆性观念转变:是全部数据,而不是随机采样;是大体方向,而不是精确制导;是相关关系,而不是因果关系。
·1:.不是随机样本,而是全体数据:在大数据时代,我们可以分析更多的数据,有时候甚至可以处理和某个特别现象相关的所有数据,而不再依赖于随机采样(随机采样,以前我们通常把这看成是理所应当的限制,但高性能的数字技术让我们意识到,这其实是一种人为限制);
2:不是精确性,而是混杂性:研究数据如此之多,以至于我们不再热衷于追求精确度;之前需要分析的数据很少,所以我们必须尽可能精确地量化我们的记录,随着规模的扩大,对精确度的痴迷将减弱;拥有了大数据,我们不再需要对一个现象刨根问底,只要掌握了大体的发展方向即可,适当忽略微观层面上的精确度,会让我们在宏观层面拥有更好的洞察力;
3:不是因果关系,而是相关关系:我们不再热衷于找因果关系,寻找因果关系是人类长久以来的习惯,在大数据时代,我们无须再紧盯事物之间的因果关系,而应该寻找事物之间的相关关系;相关关系也许不能准确地告诉我们某件事情为何会发生,但是它会提醒我们这件事情正在发生。
2.hadoop简介
1)、hadoop是什么,能干什么
Apache Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
Apache Hadoop项目为可靠的、可伸缩的分布式计算开发开源软件,用java语言编写的,可以在任何一个有JVM的机器上运行。
Hadoop实现了一个分布式文件系统,简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的硬件上;而且它提供高吞吐量来访问应用程序的数据,适合那些有着超大数据集的应用程序。HDFS放宽了POSIX的要求,可以以流的形式访问文件系统中的数据。它很擅长存储大量的半结构化的数据集。数据可以随机存放,所以一个磁盘的失败并不会带来数据丢失。Hadoop也非常擅长分布式计算——快速地跨多台机器处理大型数据集合。
2)、hadoop的发展史
Hadoop由 Apache Software Foundation 公司于 2005 年秋天作为Lucene的子项目Nutch的一部分正式引入。它受到最先由 Google Lab 开发的 Map/Reduce 和 Google File System(GFS) 的启发。
HDFS的起源来自于2003年谷歌的一篇论文:关于分布式文件系统GFS
MapReduce起源于2004年谷歌的另外一篇论文:Mapreduce系统
2006 年 3 月份,Map/Reduce 和 Nutch Distributed File System (NDFS) 分别被纳入称为 Hadoop 的项目中。
2008年Hadoop项目成为了apache旗下的*项目之一。
3)、Hadoop的特点
1:高可靠性:处理数据的能力比较稳定
2:高效性:处理数据的速度快,针对于TB级别以上的数据集
3:高可扩展性:集群节点可以扩展到千以上的数量
4:高容错性:以多副本的形式存储数据,可以为失败的任务重启另外的节点
5:成本低:可以运行在廉价的硬件上
注:
Hadoop是一个能够对大量数据进行分布式处理的软件框架。 Hadoop 以一种可靠、高效、可伸缩的方式进行数据处理。
Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。
Hadoop 是高效的,因为它以并行的方式工作,通过并行处理加快处理速度。
Hadoop 还是可伸缩的,能够处理 PB 级数据。
此外,Hadoop 依赖于社区服务,因此它的成本比较低,任何人都可以使用。
Hadoop是一个能够让用户轻松架构和使用的分布式计算平台。用户可以轻松地在Hadoop上开发和运行处理海量数据的应用程序。它主要有以下几个优点:
4)、Hadoop的核心技术、
提供了两个核心技术:
HDFS:hadoop分布式文件系统
MapReduce:并行计算框架
除了两个核心模块还有其他几个模块:
Hadoop Common:最基本的模块 原名(core)
Hadoop YARN:资源调度和管理系统
Hadoop Ozone:对象存储技术
Hadoop Submarine: Hadoop机器学习引擎
Hadoop2.x版本 四个模块:common、HDFS、MapReduce、Yarn
Hadoop3.x版本 以上六个模块都有
注:
hadoop1.x 与hadoop2.x 架构变化分析
Hadoop2相比较于Hadoop1.x来说,HDFS的架构与MapReduce的都有较大的变化,且速度上和可用性上都有了很大的提高,Hadoop2中有两个重要的变更:
(1)HDFS的NameNode可以以集群的方式布署,增强了NameNodes的水平扩展能力和高可用性,分别是:HDFSFederation与HA;
(2)MapReduce将JobTracker中的资源管理及任务生命周期管理(包括定时触发及监控),拆分成两个独立的组件,并更名为YARN(Yet Another Resource Negotiator);
5)、hadoop安装
请参照https://blog.csdn.net/Michael__One/article/details/86226193
二、hadoop组件详细
1、hdfs
1)、什么是hdfs
1.简单介绍
HDFS(Hadoop Distributed FileSystem),是Hadoop项目的两大核心之一,源自于Google于2003年10月发表的GFS论文,是对GFS的开源实现。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。
HDFS在设计之初,就是要运行在通用硬件(commodity hardware)上,即廉价的大型服务器集群上,因此,在设计上就把硬件故障作为一种常态来考虑,可以保证在部分硬件发生故障的情况下,仍然能够保证文件系统的整体可用性和可靠性。
2.特点HDFS是一个高度容错性的系统,适合部署在廉价机器上的分布式文件系统
HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用
HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。
HDFS也是一个易于扩展的分布式文件系统
3.优缺点
优点:
高容错性:数据存放多个副本,如有副本丢失,会自动复制回复
适合批处理:移动计算而不是移动数据,减少流量消耗,但是数据位置要暴露给计算框架
适合大数据处理:Pb级数据 百万规模以上的数据量,数千节点
流式文件访问:一次写入,多次读取,保证数据一致性
可构建在廉价机器上:通过副本策略,提高了容错
缺点:
不适合低延迟数据访问:HDFS的设计目标有一点是:处理大型数据集,高吞吐率。这势必要以高延迟为代价的。因此HDFS不适合处理一些用户要求时间比较短的低延迟应用请求。
不适合小文件存取:一是因此存取大量小文件需要消耗大量的寻地时间(比如拷贝大量小文件与拷贝同等大小的一个大文件) 。二是因为namenode把元信息存储在内存中,一个节点的内存是有限的。一个block元信息的内存消耗大约是150 byte。而存储1亿个block和1亿个小文件都会消耗掉namenode 20GB内存,哪个适合?当然是1亿个block(大文件)更省内存。
不适合并发写入、文件随机修改:HDFS上的文件只能有一个写者,也仅仅支持append操作,不支持多用户对同一文件的写操作,以及在文件任意位置进行修改。
2)、hdfs设计思想
现在想象一下这种情况:有四个文件 0.5TB的file1,1.2TB的file2,50GB的file3,100GB的file4;有7个服务器,每个服务器上有10个1TB的硬盘。
在存储方式上,我们可以将这四个文件存储在同一个服务器上(当然大于1TB的文件需要切分),我们需要使用一个文件来记录这种存储的映射关系吧。用户是可以通过这种映射关系来找到节点硬盘相应的文件的。那么缺点也就暴露了出来:
第一、负载不均衡。因为文件大小不一致,势必会导致有的节点磁盘的利用率高,有的节点磁盘利用率低。
第二、网络瓶颈问题。一个过大的文件存储在一个节点磁盘上,当有并行处理时,每个线程都需要从这个节点磁盘上读取这个文件的内容,那么就会出现网络瓶颈,不利于分布式的数据处理。
我们来看看HDFS的设计思想:以下图为例,来进行解释。
HDFS将50G的文件file3切成多个Block(存储块),每一个Block的大小都是固定的,比如128MB,它把这多个数据块以多副本的行式存储在各个节点上 ,再使用一个文件把哪个块存储在哪些节点上的映射关系存储起来。有了这样的映射关系,用户读取文件的时候就会很容易读取到。每个节点上都有这样的Block数据,它会分开网络瓶颈,利于分布式计算,解决了上面的第二个问题。因为每个块的大小是一样的,所以很容易实现负载均衡,解决了上面的第一个问题。
3)、块机制
在我们熟知的Windows、Linux等系统上,文件系统会将磁盘空间划分为每512字节一组,我们称之为"磁盘块",它是文件系统读写操作的最小单位。而文件系统的数据块(Block)一般是磁盘块的整数倍,即每次读写的数据量必须是磁盘块的整数倍。
HDFS同样引入了块(Block)的概念,块是HDFS系统当中的最小存储单位,在hadoop2.0(2.7.3?)中默认大小为128MB。在HDFS上的文件会被拆分成多个块,每个块作为独立的单元进行存储。多个块存放在不同的DataNode上,整个过程中 HDFS系统会保证一个块存储在一个数据节点上 。但值得注意的是 如果某文件大小或者文件的最后一个块没有到达128M,则不会占据整个块空间 。
当然块大小可以在配置文件中hdfs-default.xml中进行修改(此值可以修改)
<property> <name>dfs.blocksize</name> <value>134217728</value> <description>默认块大小,以字节为单位。可以使用以下后缀(不区分大小写):k,m,g,t,p,e以重新指定大小(例如128k, 512m, 1g等)</description> </property> <property> <name>dfs.namenode.fs-limits.min-block-size</name> <value>1048576</value> <description>以字节为单位的最小块大小,由Namenode在创建时强制执行时间。这可以防止意外创建带有小块的文件可以降级的大小(以及许多块)的性能。</description> </property> <property> <name>dfs.namenode.fs-limits.max-blocks-per-file</name> <value>1048576</value> <description>每个文件的最大块数,由写入时的Namenode执行。这可以防止创建会降低性能的超大文件</description> </property>
HDFS中的NameNode会记录文件的各个块都存放在哪个dataNode上,这些信息一般也称为元信息(MetaInfo) 。元信息的存储位置一般由dfs.namenode.name.dir来指定。
<property> <name>dfs.namenode.name.dir</name> <value>file://${hadoop.tmp.dir}/dfs/name</value> </property>
而datanode是真实存储文件块的节点,块在datanode的位置一般由dfs.datanode.data.dir来指定。
<property> <name>dfs.datanode.data.dir</name> <value>file://${hadoop.tmp.dir}/dfs/data</value> </property>
HDFS上的块为什么远远大与传统文件系统,是有原因的。目的是为了最小化寻址开销时间。
HDFS寻址开销不仅包括磁盘寻道开销,还包括数据库的定位开销,当客户端需要访问一个文件时,首先从名称节点(namenode)获取组成这个文件的数据块的位置列表,然后根据位置列表获取实际存储各个数据块的数据节点的位置,最后,数据节点根据数据块信息在本地Linux文件系统中找到对应的文件,并把数据返回给客户端,设计一个比较大的块,可以把寻址开销分摊到较多的数据中,相对降低了单位数据的寻址开销
举个例子: 块大小为128MB,默认传输效率100M/s ,寻址时间为10ms,那么寻址时间只占传输时间的1%左右
当然,块也不能太大,因为另一个核心技术MapReduce的map任务一次只处理一个数据块,如果任务太少,势必会降低工作的并行处理速度。
HDFS的块概念,在解决了大数据集文件的存储同时,不仅解决了文件存取的网络瓶颈问题,还
1、解决了大数据集文件的存储:大文件分块存储在多个数据节点上,不必受限于单个节点的存储容量。
2、简化系统设计:块大小固定,单个节点的块数量比较少,容易管理。元数据可以单独由其他系统负责管理。
3、适合数据备份:每个块可以很容易的冗余存储到多个节点上,提高了系统的容错性和可用性
注:副本存放策略
数据副本的存放策略。HDFS将每个文件进行分块存储,同时一个数据块又有多个副本,这些副本如何分布在不同机器上呢?
放在同一台机器肯定是不行的,这样冗余存储就没什么意义了,一种简单但没有优化的方法是放在不同的机架上,这样可以有效防止一台机架出了问题后数据的丢失,并且允许读数据时候 充分利用多个机架之间的带宽,通过将副本均匀分布在集群中,有利于组件失效情况下的负载均衡。
但是这种策略,一个写操作就需要将数据传输到各个机架,因而增大了写操作的代价。HDFS的默认副本数目是3,目前的 操作是:
1、第一块,在本机器的HDFS目录下存储一个block
2、第二块,另寻一个机架的某个DataNode上存放
3、第三块,在该机架的同一个机架上找某一台机器存放最后一块。
这样,就减少了机架间的数据传输,提高了写操作的效率,而且能保证对这个块的访问能优先在本机架上找到,如果这一整个机架出现了问题,也可以在另外的机架(Rack)上找到。
4)、hdfs体系结构
HDFS 采用的是master/slaves这种主从的结构模型来管理数据,这种结构模型主要由四个部分组成,分别是Client(客户端)、Namenode(名称节点)、Datanode(数据节点)和SecondaryNameNode(第二节点,上面的图解中没有涉及到,下面会讲解)。真正的一个HDFS集群包括一个Namenode和若干数目的Datanode。Namenode是一个中心服务器,负责管理文件系统的命名空间 (Namespace )及客户端对文件的访问。集群中的Datanode一般是一个节点运行一个DataNode进程,负责管理客户端的读写请求,在Namenode的统一调度下进行数据块的创建、删除和复制等操作。数据块实际上都是保存在Datanode本地的Linux文件系统中的。每个Datanode会定期的向Namenode发送数据,报告自己的状态(我们称之为心跳机制)。没有按时发送心跳信息的Datanode会被Namenode标记为“宕机”,不会再给他分配任何I/O请求。
用户在使用Client进行I/O操作时,仍然可以像使用普通文件系统那样,使用文件名去存储和访问文件,只不过,在HDFS内部,一个文件会被切分成若干个数据块,然后被分布存储在若干个Datanode上。
比如,用户在Client上需要访问一个文件时,HDFS的实际工作流程如此:客户端先把文件名发送给Namenode,Namenode根据文件名找到对应的数据块信息及其每个数据块所在的Datanode位置,然后把这些信息发送给客户端。之后,客户端就直接与这些Datanode进行通信,来获取数据(这个过程,Namenode并不参与数据块的传输)。这种设计方式,实现了并发访问,大大提高了数据的访问速度。
HDFS集群中只有唯一的一个Namenode,负责所有元数据的管理工作。这种方式保证了Datanode不会脱离Namenode的控制,同时,用户数据也永远不会经过Namenode,大大减轻了Namenode的工作负担,使之更方便管理工作。通常在部署集群中,我们要选择一台性能较好的机器来作为Namenode。当然,一台机器上也可以运行多个Datanode,甚至Namenode和Datanode也可以在一台机器上,只不过实际部署中,通常不会这么做的
5)、体系解析
1、Client
严格意义上,HDFS集群上是没有客户端这一部分的,它只是提供了用户操作HDFS的程序接口。HDFS提供了各种各样的客户端接口,比如命令行接口、Java API, Thrift接口、C语言库、用户空间文件系统〔Filesystem in Userspace,FUSE)等。这样,用户就可以有选择的使用客户端程序来操纵HDFS进行浏览、读取和写入等操作了。客户端程序可以在集群外的任意一台机器上,也可以在集群中的一个节点上。只要客户端能拥有Namenode的ip地址以及端口号信息。
2、Namenode
Namenode是集群主从(master/slaves)结构模型中的唯一一个管理者(master),它负责维护HDFS文件系统的命名空间,以及管理数据块位置信息,也管理着客户端的读写操作。命名空间(Namespace)的管理:命名空间指的就是文件系统树及整棵树内的所有文件和目录的元数据,每个Namenode只能管理唯一的一命名空间。HDFS暂不支持软链接和硬连接。Namenode会在内存里维护文件系统的元数据,同时还使用fsimage和editlog两个文件来辅助管理元数据,并持久化到本地磁盘上
fsimage:命名空间镜像文件,它是文件系统元数据的一个完整的永久检查点,内部维护的是最近一次检查点的文件系统树和整棵树内部的所有文件和目录的元数据,如修改时间,访问时间,访问权限,副本数据,块大小,文件的块列表信息等等。
使用XML格式查看fsimage文件:
$ hdfs oiv -i 【fsimage_xxxxxxxxxxxx】 -o 【目标文件路径】 -p XML
如:$ hdfs oiv -i fsimage_0000000000000000052 -o ~/fs52.xml -p XMLeditlog:编辑日志文件,当hdfs文件系统发生打开、关闭、创建、删除、重命名等操作产生的信息除了在保存在内存中外,还会持久化到编辑日志文件。比如上传一个文件后,日志文件里记录的有这次事务的txid,文件的inodeid,数据块的副本数,数据块的id,数据块大小,访问时间,修改时间等
查看editlog文件的方式:
$ hdfs oev -i 【edits_inprogress_xxxxxxxxxxxxxx】 -o 【目标文件路径】
如: $ hdfs oev -i edits_inprogress_000000000000000003 -o ~/ed3.xmlNamenode在内存中,也记录着每个文件的各个块所在的数据节点的位置信息(数据块-Datanode映射信息),但是并不持久保存位置信息,因为这些信息会在文件系统启动时动态的重建这些信息(心跳机制,blockreport)。
当然,Namenode也管理着客户端对文件的访问等操作,比如客户端想要访问一个文件,得需要通过Namenode的检查,比如权限检查,文件是否存在,及其数据块的Datanode列表信息等。
3、Datanode
Datanode是HDFS主从结构模式中的工作者,在HDFS集群上,有若干个Datanode。Datanode负责数据块的存储和读取(会根据客户端或者Namenode的调度来进行数据的存储和检索),数据块会存储在节点的本地Linux文件系统中(包括block和block.meta)。Datanode会定期向Namenode发送自己所存储的块的列表(心跳机制,blockreport)。4、SecondaryNamenode
SecondaryNamenode,是HDFS集群中的重要组成部分,它可以辅助Namenode进行fsimage和editlog的合并工作,减小editlog文件大小,以便缩短下次Namenode的重启时间,能尽快退出安全模式。
6)、检查点机制
两个文件的合并周期,称之为检查点机制(checkpoint),是可以通过hdfs-default.xml配置文件进行修改的
<property> <name>dfs.namenode.checkpoint.period</name> <value>3600</value> <description>两次检查点间隔的秒数,默认是1个小时</description> </property> <property> <name>dfs.namenode.checkpoint.txns</name> <value>1000000</value> <description>txid执行的次数达到100w次,也执行checkpoint</description> </property> <property> <name>dfs.namenode.checkpoint.check.period</name> <value>60</value> <description>60秒一检查txid的执行次数</description> </property>
我们再来看看SecondaryName是如何创建检查点的?
1)、SecondaryNamenode请求Namenode停止使用正在编辑的editlog文件,Namenode会创建新的editlog文件(小了吧),同时更新seed_txid文件。
2)、SecondaryNamenode通过HTTP协议获取Namenode上的fsimage和editlog文件。
3)、SecondaryNamenode将fsimage读进内存当中,并逐步分析editlog文件里的数据,进行合并操作,然后写入新文件fsimage_x.ckpt文件中。
4)、SecondaryNamenode将新文件fsimage_x.ckpt通过HTTP协议发送回Namenode。
5)、Namenode再进行更名操作。安全模式
Namenode启动时,首先要加载fsimage文件到内存,并逐条执行editlog文件里的事务操作,在这个期间一但在内存中成功建立文件系统元数据的映像,就会新创建一个fsimage文件(该操作不需要SecondaryNamenode)和一个空的editlog文件。在这个过程中,namenode是运行在安全模式下的,Namenode的文件系统对于客户端来说是只读的,文件修改操作如写,删除,重命名等均会失败。
Namenode不会存储数据块的位置信息,因此Namenode在启动后,还会等待接受Datanode的blockreport(数据块的状态报告)。严格来说,只有接受到状态报告后,客户端的读操作才能成功完成。
PS:启动一个刚刚格式化完的集群时,HDFS还没有任何操作呢,因此Namenode不会进入安全模式。
查看namenode是否处于安全模式:
$ hdfs dfsadmin -safemode get Safe mode is ON
管理员可以随时让Namenode进入或离开安全模式,这项功能在维护和升级集群时非常关键
hdfs dfsadmin -safemode enter hdfs dfsadmin -safemode leave
将下面的属性的值设置为大于1,将永远不会离开安全模式
<property> <name>dfs.namenode.safemode.threshold-pct</name> <value>0.999f</value> </property>
有时,在安全模式下,用户想要执行某条命令,特别是在脚本中,此时可以先让安全模式进入等待状态
hdfs dfsadmin -safemode wait
7)、读写流程
客户端与HDFS、Namenode和Datanode之间的数据流到底是什么样子的。我们分别从两个方面来了解数据流,一个是客户端从HDFS中读数据,一个是客户端向HDFS中写数据。
一、读流程解析
我们来结合一下代码和流程图来解析一下HDFS的读流程。public static void main(String[] args) throws Exception { Configuration conf = new Configuration();// 读取配置文件 core-default 和 core-site.xml FileSystem fs = FileSystem.get(conf);// 获取DistributedFileSystem // System.out.println(fs.getClass().getName());//确认类型 Path path = new Path("/hyxy/LICENSE.txt");// 创建Path对象,指定要读取的文件 FSDataInputStream fsinput = fs.open(path);// 向Namenode发送读取请求 IOUtils.copyBytes(fsinput, System.out, 4096, false);// 将输入流内内的数据复制到System.out输出流中 IOUtils.closeStream(fsinput);// 关闭流 }
1、获取分布式文件系统对象
在客户端中,我们在读取文件之前要做的工作,必然是通过FileSystem的get()方法,来加载相关配置文件,分析相关参数的值,获取DistributedFileSystem对象。DistributedFileSystem是FileSystem的子类,在加载配置文件时,会读取本地jar包内置的core-default.xml默认配置文件以及本地的core-site.xml,而本地的core-site.xml的fs.defaultFS的值为file:///,所以,还要将你在集群中etc/hadoop/core-site.xml复制一份到你的eclipse项目下的src目录里。这样get方法加载的就是你集群的配置了。2、调用文件系统对象的open方法
客户端通过调用DistributedFileSystem的open方法(图中步骤1),向HDFS的Namenode发送请求,希望读取指定路径下的文件。这个时候,客户端的DistributedFileSystem是通过远程调用协议(rpc)来请求Namenode的,Namenode从内存中确定文件的第一个块,以及块副本的位置信息,也就是Datanode节点列表,发送回给客户端(图中步骤2)。并返回一个输入流对象FSDataInputStream,这个FSDataInputStream是javaIO流中的DataInputStream的子类。3、调用read方法,读取数据块
public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); //while循环中重复执行read方法 } }
客户端获取输入流对象FSDataInputStream后,会调用该对象的read方法来读取数据块(图中步骤3)。FSDataInputStream是一个支持文件定位的输入流,内部封装了DFSInputStream对象,该对象管理着DataNode和Namenode的I/O操作。DFSInputStream会选择离客户端最近的存有块副本的Datanode节点上读取数据。通过上图copyBytes方法的源码,我们可以看到,会重复调用read()方法来数据块的数据(图中步骤4),当数据块被读完后,DFSInputStream关闭与该Datanode的连接。4、请求读取下一个块数据
DFSInputStream读取完,并关闭与存有第一个数据块的Datanode的连接后,DFSInputStream就会继续从Namenode发送回来的下一个块的Datanode列表信息中寻找最佳的Datanode节点,继续读取(图中步骤五)。5、调用close方法,结束读流程
当文件的最后一个数据块,被DFSInputStream读取完毕后,客户端就会调用FSDataInputStream的close方法,结束读流程任务。6、读流程的错误处理
在读取数据过程中,如果DFSInputStream与Datanode的通信出现错误后,会尝试从离该块所在Datanode的最近的存储该块副本的Datanode上继续读取数据。同时记录这个故障Datanode,用于保证以后不会再从该节点上读取其他块信息。另一个种情况,DFSInputStream也会校验和确认从Datanode上发来的数据是否完整(校验和)。如果发现有损坏,DFSInputStream会尝试从其他Datanode上读取该块的副本。当然也会将损坏的块通知给Namenode,由Namenode进行副本数恢复。
二、写流程解析
下面我们在从写操作API和流程图,来详细解析以下HDFS的写流程。public static void main(String[] args) throws Exception { //加载本地指定目录下的文件 InputStream is = new BufferedInputStream( new FileInputStream("D:/source/App/ApacheSoft/hadoop-2.6.1.tar.gz")); Configuration conf = new Configuration(); //获取分布式文件系统 FileSystem fs = FileSystem.get(conf); //指定要上传到HDFS文件系统上的目录和文件名 Path des = new Path("/apps/apache/hadoop.tar.gz"); //获取输出流对象 FSDataOutputStream fsdos = fs.create(des); //开始上传 IOUtils.copyBytes(is, fsdos, 4096, false); //关闭流对象 IOUtils.closeStream(fsdos); }
从代码中,我们可以看到,我们的目的是将D盘上的ApacheSoft目录下的Hadoop-2.6.1.tar.gz压缩包文件复制到HDFS文件系统的/apps/apache目录下,文件名为hadoop.tar.gz。我们来详细解析一下步骤:1、获取分布式文件系统对象
和读取流程一样,在客户端要使用Configuration类来加载配置文件信息,然后再调用FileSystem的get()方法,获取一个分布式文件系统对象DistributedFileSystem(看代码片段)。2、客户端请求Namenode
客户端调用DistributedFileSystem的create(Path p)方法,向Namenode发送请求新建指定路径下的文件(步骤1)。客户端采用远程调用协议(RPC)与Namenode进行通信,这个时候Namenode要经过各种不同的检查,如命名空间里该路径文件是否已存在,客户端是否有相应权限。如果没有通过检查,返回IOException。如果检查通过,Namenode就会在命名空间下新建该文件,记录元数据(步骤2,注意,此时新文件为0字节,还没有数据块的信息),并返回一个FSDataOutputStream输出流对象。FSDataOutputStream封装了一个DFSDataOutputStream对象,由该对象负责处理datanode和namenode之间的通信。这个时候,无需考虑父目录是否存在,create方法会帮助创建多级目录。
3、客户端加载数据
客户端调用DFSOutputStream的create方法,开始执行写入操作。DFSOutputStream先将数据封装成一个个的数据包Packet,依次写入一个缓存队列中,这个缓存队列,我们称之为dataqueue。然后由线程DataStreamer,向Namenode请求一组合适的Datanode列表,这一组Datanode构成一个管线(Pipeline of datanodes)。4、向Datanode中写数据
当Datanode管线确认好之后,DataStreamer开始从dataqueue中依次取出Packet数据包,向管线中的datanode写入。第一个Packet写入管线中的第一个Datanode的内存中,然后第一个Datanode再将已经写好的Packet,发送给管线中的第二个Datanode,第二个Datanode将Packet写入内存后,再将Packet发送给第三个Datanode,然后写到内存中(图中步骤4)。DataStreamer在将Packet写入管线中时,同时也会将该Packet存储到另外一个由ResponseProcessor线程管理的缓存队列ackqueue中,这个队列我们称之为“确认队列”。ResponseProcessor线程会等待Datanode列表写好的确认信息。当收到所有的Datanode的确认信息后(图中步骤5),该线程再将ackqueue里的packet删除。
5、调用close方法
当写入一个块大小的n个packet数据包后,客户端调用FSDataOutputStream的close()方法(图中步骤6)。在调用close方法前,Datanode列表将内存中的数据写入本地磁盘。然后,DataStreamer继续向Namenode请求下一个块的Datanode列表,开始下一个块的写入。直到写完整个文件的最后一个块数据,然后客户端通知Namenode,整个文件已经写完(图中步骤7)。
三、MapReduce
在大数据时代,除了需要解决大规模数据的高效存储问题,还需要解决的另一个问题,就是如何对大规模数据进行高效处理?我们知道Hadoop的有两个核心技术,一个是HDFS(解决了大规模数据的存储问题),另一个是MapReduce。而MapReduce这个核心技术就是解决如何对大规模数据进行高效处理的并行编程模型。
1、MapReduce模型简介
1)、分布式并行编程
在过去的很长一段时间里,CPU的性能都会遵循”摩尔定律“,在性能上每隔18个月左右就是提高一倍。那个时候,不需要对程序做任何改变,仅仅通过使用更优秀的CPU,就可以进行性能提升。但是现在,在CPU性能提升的道路上,人类已经到达了制作工艺的瓶颈,因此,我们不能再把希望寄托在性能更高的CPU身上了。
现在这个时候,大规模数据存储在分布式文件系统上,人们也开始采用分布式并行编程来提高程序的性能。分布式程序运行在大规模计算机集群上,集群是大量的廉价服务器,可以并行执行大规模数据处理任务,这样就获得了海量的计算能力
分布式并行编程比传统的程序有明显的区别,它运行在大量计算机构成的集群上,可以充分利用集群的并行处理能力;同时,通过向集群中增加新的计算节点,就可以很容易的实现集群计算能力的扩展。
Hadoop MapReduce是谷歌提出的分布式并行编程模型MapReduce论文的开源实现,运行在分布式文件系统HDFS上的并行编程模型。MapReduce的优势在于处理大规模数据集(1TB以上)。
2)、MapReduce模型
我们应该知道,Hadoop的MapReduce核心技术起源于谷歌在2004年发表的关于MapReduce系统的论文介绍。论文中有这么一句话:Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages。这句话提到了MapReduce思想来源,大致意思是,MapReduce的灵感来源于函数式语言(比如Lisp)中的内置函数map(映射)和reduce(规约)。
简单来说,在函数式语言里,map表示对一个列表(List)中的每个元素做计算,reduce表示对一个列表中的每个元素做迭代计算。它们具体的计算是通过传入的函数来实现的,map和reduce提供的是计算的框架。我们想一下,reduce既然能做迭代计算,那就表示列表中的元素是相关的(比如我想对列表中的所有元素做相加求和,那么列表中至少都应该是数值吧)。而map是对列表中每个元素做单独处理的,这表示列表中可以是杂乱无章的数据。这样看来,就有点联系了。在MapReduce里,Map处理的是原始数据,自然是杂乱无章的,每条数据之间互相没有关系;到了Reduce阶段,数据是以key后面跟着若干个value来组织的,这些value有相关性,至少它们都在一个key下面,于是就符合函数式语言里map和reduce的基本思想了。
MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员需要为每个阶段写一个函数,分别是map函数和reduce函数。
在执行MapReduce任务时,一个大规模的数据集会被划分成许多独立的等长的小数据块,称为输入分片(input split)或简称"分片"。Hadoop为每个输入分片分别构建一个map任务,并由该任务来运行用户自定义的map函数,从而处理分片中的每条记录。map任务处理后的结果会继续作为reduce任务的输入,最终由reduce任务输出最后结果,并写入分布式文件系统。
MapReduce设计的一个理念是“计算向数据靠拢”(移动计算),而不是“数据向计算靠拢”(移动数据)。因为移动数据需要大量的网络传输开销,尤其是在大规模数据环境下,这种开销尤为惊人,所以移动计算要比移动数据更加经济。所以,在一个集群中,只要有可能,MapReduce框架就会将Map程序就近的在HDFS数据所在的节点上运行,即将计算节点和存储节点放在一起运行,从而减少节点间的数据移动开销。正因为这样,MapReducekeyi 并行的进行处理,解决计算效率问题。
2)、Map和Reduce函数
MapReduce模型的核心是Map函数和Reduce函数,二者都是由程序员负责具体实现的。MapReduce编程值所以比较容易,是因为程序员只要关注如何实现Map和Reduce函数,不需要处理并行编程中的其他各种复杂问题,如分布式存储,工作调度,负载均衡,容错处理,网络通信等,这些问题都会由MapReduce框架负责处理。
Map函数和Reduce函数都是以<key,value>作为输入,按一定的映射规则转换成另一个或一批<key,value>进行输出(见下表)。
Map函数的输入是来自于分布式文件系统的文件块,这些文件块的格式是任意的,可以是文档,也可以使二进制格式的。文件块是一系列元素的集合,这些元素也是任意类型的,同一个元素不能跨文件块存储。Map函数将输入的元素转换成<key,value>形式的键值对,键和值得类型也都是任意的,其中,键不同于一般的标志属性,即键没有唯一性,不能作为输出的身份标识,即使是同一输入元素,也可通过一个Map任务生成具有相同键的多个<key,value>。Reduce函数的任务就是将输入的一些列具有相同键的键值对以某种方式组合起来,输出处理后的键值对,输出结果会合并成一个文件。用户可以指定Reduce任务的个数(如n个),并通知实现系统,然后主控进程通常会选择一个Hash函数,Map任务输出的每个键都会经过Hash函数计算,并根据哈希结果将该键值对输入相应的Reduce任务来处理。对于处理键为k的Reduce任务的输入形式为<k,<v1,v2,...vn>>,输出为<k,V>。
2、MapReduce体系结构
2)、体系结构
Client:用户编写的MapReduce程序通过Client提交到JobTracker端
JobTracker(Resourcemanager):负责资源控制和作业调度;负责监控所有TaskTracker与Job的健康状况,一旦出现失败,就把相应的任务转移到其他节点;JobTracker会跟踪任务的执行进度、资源使用量等信息,并把这些信息告诉任务调度器(TaskTracker),而调度器会在资源出现空闲的时候,选择合适的任务去使用这些资源。
TaskTracker(nodemanager):会周期地通过“心跳”将本节点上的资源使用情况和任务运行进度汇报给JobTracker,同时接收jobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
TaskTracker使用”slot”(槽)等量划分节点上的资源量(CPU、本内存等)。一个Task获取到一个Slot后才有机会运行。而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为MapSlot和ReduceSlot,分别提供给mapTask和reduceTask使用。
Task:分为MapTask和ReduceTask两种,均有TaskTracker启动。
3)、工作流程
不同的Map/Reduce任务之间不会进行通信。
HDFS以固定大小的block为基本的单位存储数据 ,而对于MapReduce来讲,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。block与split是两个不同的东西,但是为了减少寻址开销,我们一般一个分片就是一个block大小(一个分片也可以是一个半block,但是如果两个块不在同一个机架上,那么我们就需要去寻找另一台机子),split的多少就决定了map任务的数目。而reduce任务的数目取决于集群中可用的reduce任务槽(slot),通常设置比reduce任务槽数目少一点点的reduce任务个数,预留系统资源处理可能发生的错误。
2、shuffle
在Hadoop这样的集群环境中,大部分的Map和Reduce任务是执行在不同节点上的,shuffle的过程就需要完整地从MapTask端拉取数据到Reduce端。在跨节点拉取数据的时候,我们当然希望能尽可能地减少对带宽的不必要消耗,减少IO对任务的影响。
1)、Map端的shuffle过程
每个Map任务分配有一个缓存,默认是100MB,当Map任务的输出达到了溢写比例(一般为0.8)时,就会发生溢写操作:把缓存中数据写入磁盘。在写入的时候,会根据数据所在区分开(分区),再排序,如果用户有定义合并操作,还会进行合并。map任务全部结束之前会进入归并操作,归并成一个大文件,写入本地磁盘。举个例子来了解合并combine和归并merge的区别,两个键值对< “a”,1>< “a”,1>,进行合并操作,会得到< “a”, 2>,归并得到:< “a”, <1,1>>
2)、Reduce端的shuffle过程
reduce任务通过RPC向JobTracker询问map任务是否已经完成,若完成,则领取数据,放入缓存,来自不同Map机器,先归并,多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的,再合并,写入磁盘。当Map操作中的数据不多时,Map操作不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce。
整个应用程序的执行过程如上:对于一个用户程序,主节点将为各个节点分配Map或者reduce任务,map节点一般从HDFS中获取数据,将其切分为分片,各节点读取各自的分片 ,计算后放入缓存,缓存渐渐满了后进行溢写操作(shuffle),溢写到本地磁盘上,Reduce节点再去远程读取数据,(shuffle)执行完操作后输出文件。
注:总结
MapReduce将复杂的,运行于大规模集群上的并行计算过程高度抽象到了两个函数:Map和Reduce。
整个过程可以概括为:Map映射,Reduce归约。
具体过程为:从分布式文件系统中读入数据,执行Map任务输出中间结果,通过shuffle将中间结果分区排序整理后
Reduce任务,执行reduce任务后再将结果写入分布式文件系统。在这个过程中,Shuffle阶段非常关键。
但是我们也可以看到,两个子阶段严重地降低了Hadoop的性能:1、map阶段产生的结果要写入磁盘,虽然提高了系统的可靠性,但是增加了IO开销。2、shuffle阶段采用的是HTTP远程拷贝Map节点上的结果数据。
3、shuffle map端源码
环形缓冲区:类型MapOutputBuffer的源码解析
属性:
private int partitions;
private Class<!-- <K> --> keyClass; //在job中定义的map输出端的key类型
private Class<!-- <V> --> valClass; //在job中定义的map输出端的value类型
private Serializer<!-- <K> --> keySerializer; //用来序列号key值,存储到buffer里
private Serializer<!-- <V> --> valSerializer; //用来序列化value值,存储到buffer里
// k/v accounting
private IntBuffer kvmeta; // 参考init方法内的赋值操作,实际上是对buffer进行包装成int类型的数组,提供存储元数据的界面
int kvstart; // 用来标记元数据的起始位置 (第一次的位置应该是倒数第四个int元素位置)
int kvend; // 元数据在int数组的结束位置
int kvindex; // 存储下一个元数据的起始位置
int equator; // 分隔点的位置
int bufstart; // kv对的起始位置
int bufend; // kv对的结束位置
int bufmark; // kv对的结束位置进行标记
int bufindex; // 下一个kv对的开始位置
int bufvoid; // 缓冲区的长度
byte[] kvbuffer; // 环形缓冲区
private final byte[] b0 = new byte[0];//校验是否到边界
//一个元数据由四部分组成[valstart,keystart,parition,vallen]--四部分统计是kv对的信息
private static final int VALSTART = 0; // valstart相对于kvindex的偏移量
private static final int KEYSTART = 1; // keystart元数据相对于kvindex的偏移量
private static final int PARTITION = 2; // parition元数据相对于kvindex的偏移量
private static final int VALLEN = 3; // vallen元数据相对于kvindex的偏移量
private static final int NMETA = 4; // 一个元数据占4个int
private static final int METASIZE = NMETA * 4; // 统计一个元数据实际的字节数目
final SpillThread spillThread = new SpillThread();//溢写线程
private FileSystem rfs;//溢写文件系统
//初始化函数
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
mapOutputFile = mapTask.getMapOutputFile();//此对象决定map输出文件的位置
sortPhase = mapTask.getSortPhase();//设置排序阶段
partitions = job.getNumReduceTasks();//获取分区个数
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();//溢写文件或最后的合并文件存储到本地
//获取溢写阈值
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
//获取指定的缓冲区大小,如果没有指定,使用默认值100
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
//在排序阶段默认使用快排方式。
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
int maxMemUsage = sortmb << 20;// 将缓冲区的单位转成MB
maxMemUsage -= maxMemUsage % METASIZE;//保证maxMenUsage是16个倍数
kvbuffer = new byte[maxMemUsage];// 创建环形缓冲区
bufvoid = kvbuffer.length;
//将字节数组buffer 转成int数组视图
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);// 定义分隔点,同时给kvindex进行赋值
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start(); //启动溢写线程
while (!spillThreadRunning) { 进行轮询的方式询问
spillDone.await();
}
//收集函数:往环形缓冲区里存储
public synchronized void collect(K key, V value, final int partition
) throws IOException {
bufferRemaining -= METASIZE;// 80MB
if (bufferRemaining <= 0) { 如果不小与0,不用溢写
// start spill if the thread is not running and the soft limit has been
// reached
spillLock.lock();
try {
do {
if (!spillInProgress) {
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
//不溢写,收集kv对和kvmeta数据
try {
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);//序列化key值,写入buffer中,bufindex值改变
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);//序列化value值,写入buffer中,bufindex值改变
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();//对kvindex进行标记
mapOutputRecordCounter.increment(1);//计数器
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));//计数:kv对的字节长度
// 写kv对的元数据信息
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// 移动kvindex,为下一个kvmeta做准备,移动了int数组的四个位置
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
//当前kv对较大(针对于80MB),会直接溢写到磁盘
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
MapTask源码
MapTask.run()-->
-->if (conf.getNumReduceTasks() == 0)
说明:获取reduceTask的个数,如果是0,将map阶段设置成100%
如果有reduceTask,将mapPhase设置(67%)
sortPhase设置(33%)
--> boolean useNewApi = job.getUseNewMapper();
说明:是否使用了新的api
-->initialize(job, getJobID(), reporter, useNewApi);
说明:修改运行状态为running,获取输出格式(TextOutputFormat)
获取输出路径
--> runNewMapper(job, splitMetaInfo, umbilical, reporter);
-->说明:获取任务上下文对象,获取实际Mapper对象,获取输入格式,构造当前任务的inputsplit以及
记录阅读器(使用NewTrackingRecordReader对RecordReader)
获取map输出收集器(实际上是一个RecordWriter,写到MapOutputBuffer对象)
将输入格式,记录写出器,分片信息等封装成MapContextImpl对象
再将MapContextImpl包装成WrappedMapper类型
-->input.initialize(split, mapperContext);(略)
-->mapper.run(mapperContext);
说明:运行Mapper里的run方法(实际上:参数对象为MapContextImpl)
Map输出跟踪
【MyMapper.class】context.write(yearT, temperature);//将数据写出
-->【WrappedMapper.class】write(KEYOUT key, VALUEOUT value)
-->【TaskInputOutputContextImpl.class】write(KEYOUT key, VALUEOUT value)
-->【NewOutputCollector.class】write(K key, V value)
说明:构建NewOutputCollector对象时,调用构造器
//给属性MapOutputCollector赋值MapOutputBuffer对象
collector = createSortingCollector(job, reporter);
//获取分区数
partitions = jobContext.getNumReduceTasks();
如果分区数大于1:
分区器,通过反射机制获取 参考如下:
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job)
【conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class)】
获取默认的HashPartitioner分区器
用户自定义的话,会从job身上获取自定义的分区器
如果分区数是1(默认情况)
不会采用MR提供的默认的分区器(如hashPartitioner),而是使用匿名内部类
创建一个分区器,返回索引0.
如果分区数是0. 不走shuffle,涉及不到分区情况。
-->【MapOutputBuffer.class】collect(K key, V value, final int partition)
说明:收集器调用collect方法,将kv对存储再环形缓冲区中
在收集map输出数据时,收集完之后,有可能做了多次溢写,还有可能收集到的所有数据不满足80M.
那么,不管什么情况,最后都会调用flush方法。
方法sortAndSpill()
说明:当满足kvbuffer里的阈值,或者是最后一次flush操作,都会调用sortAndSpill操作
//提前统计这次溢写是多少个字节长度:包含kv原始数据和分区的头信息
final long size = distanceTo(bufstart, bufend, bufvoid) +partitions * APPROX_HEADER_LENGTH;
//为这次溢写,创建一个溢写记录对象
final SpillRecord spillRec = new SpillRecord(partitions);
//为这次溢写,构造溢写文件路径,
final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);
//调用文件系统对象的create方法,创建溢写文件,返回流对象
out = rfs.create(filename);
//统计元数据在环形缓冲区的位置,并对其进行排序
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
说明:默认使用QuickSort排序方式,一定会调用MapOutputBuffer.compare()方法
和MapOutputBuffer.swap()方法
-->
//比较
public int compare(final int mi, final int mj) {
final int kvi = offsetFor(mi % maxRec);
final int kvj = offsetFor(mj % maxRec);
//取两个元数据的分区部分对应的索引值
final int kvip = kvmeta.get(kvi + PARTITION);
final int kvjp = kvmeta.get(kvj + PARTITION);
//如果分区不同,先按分区索引排序
if (kvip != kvjp) {
return kvip - kvjp;
}
// 分区是同一个时,通过元数据取key的原始数据,进行比较
return comparator.compare(kvbuffer,
kvmeta.get(kvi + KEYSTART),
kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
kvbuffer,
kvmeta.get(kvj + KEYSTART),
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
}
//交换
public void swap(final int mi, final int mj) {
//比较后,如果不符合排序规则,就进行交换。
//取两个元数据的偏移量
int iOff = (mi % maxRec) * METASIZE;
int jOff = (mj % maxRec) * METASIZE;
//进行交换
System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
}
//Spill溢写操作
//构建一个索引记录对象,目录是记录这次溢写的基本信息,如偏移量,长度等
final IndexRecord rec = new IndexRecord();
//构建了一个buffer,存储value的字节序列
final InMemValBytes value = new InMemValBytes();
//一个分区一个分区的进行遍历元数据
for (int i = 0; i < partitions; ++i) {
//获取开始溢写的偏移量
long segmentStart = out.getPos();
//加密或者不加密操作,重写构建流对象
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
//将partitionOut进行再次封装成writer流对象,进行溢写
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
//构建一个buffer,存储key的字节序列
DataInputBuffer key = new DataInputBuffer();
//遍历当前分区中的所有元数据
while (spindex < mend &&kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
//取出当前元数据
final int kvoff = offsetFor(spindex % maxRec);
//通过元数据取对应的原始数据kv对
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
//将key的值,写入buffer中
key.reset(kvbuffer, keystart, valstart - keystart);
//将value的值写buffer中。
getVBytesForOffset(kvoff, value);
writer.append(key, value);
//为下一个元数据做准备工作
++spindex;
调用 writer.close(); 将key和value对应的两个buffer写到溢写文件中
统计这次溢写的索引信息,封装到溢写记录(spillRec)对象里【spillRec.putIndex(rec, i);】
//条件:索引信息对应的缓存区,如果这个缓存区小于真正的索引信息量,就写到文件中
// 如果缓存区大于信息量,就不进行写操作(写索引文件)
将溢写对象里的信息写入一个索引文件中【spillRec.writeToFile(indexFilename, job);】
//为下一次溢写进行统计数字【这个数字与生成的溢写文件名称有关系】
++numSpills;
mergeParts()
说明:当最后一次溢写操作完成【一定是调用了MapOutputBuffer.flush方法】后。开始合并溢写文件
//统计所有溢写文件的字节总长度
long finalOutFileSize = 0;
//统计所有索引文件的字节总长度
long finalIndexFileSize = 0;
//创建一个numSpills长度的Path数组,存储所有溢写文件的路径
final Path[] filename = new Path[numSpills];
//使用for循环 获取所有溢写文件的路径存储到数组Path[]中,同时,累加所有溢写文件的字节长度
for(int i = 0; i < numSpills; i++) {
溢写文件的个数==1时,sameVolRename(filename[0],mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
将溢写文件名称重命名,当成最后一个文件。
当溢写文件个数不为1时
获取所有的索引文件 加载到indexCacheList(索引缓存列表)
在finalOutFileSize属性上再次添加partitions * APPROX_HEADER_LENGTH
在finalIndexFileSize 赋值为 partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
构建最终的输出文件路径和最终的索引文件路径
Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
Path finalIndexFile =mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//调用文件系统的create方法创建最终的文件,
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
当溢写文件为0时,也创建一个空文件,没有做任何写操作。
当溢写文件>1时,
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
//开始遍历多个文件的相同分区
for (int parts = 0; parts < partitions; parts++) {
// 在当此循环中,创建要合并的段(segment)
List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills);
//然后遍历当前分区的所有溢写文件,
for(int i = 0; i < numSpills; i++) {
//获取当前分区的索引记录
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
将一个溢写文件的当前分区封装成Segment对象。
Segment<K,V> s =new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,indexRecord.partLength, codec, true);
//添加到集合中
segmentList.add(i, s);
//下面三行代码,判断是否需要再次排序
int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//再次调用下面方法,进行二次排序
Merger.merge(job, rfs,keyClass, valClass, codec,segmentList, mergeFactor,new Path(mapId.toString()),job.getOutputKeyComparator(), reporter, sortSegments,null, spilledRecordsCounter, sortPhase.phase(),TaskType.MAP);
//将合并后的输出写入磁盘(往最终的文件里写)
long segmentStart = finalOut.getPos();
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
Writer<K, V> writer =
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}
//关闭流,结束此分区的写入磁盘操作,进行下一个分区的写入操作
writer.close();
//所有分区结束写出操作后,关闭流
finalOut.close();
//删除所有的溢写文件
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
-----------------------------------------------------未完---------------------------------------------------
参考:
https://blog.csdn.net/Michael__One/article/details/86407995
https://blog.csdn.net/Michael__One/article/details/86499972
https://blog.csdn.net/yinglish_/article/details/75670398