一、Hadoop概述
- HDFS :分布式文件系统
- MapReduce : 分布式计算系统
- Yarn:分布式集群资源管理
1.高可用架构模型
文件系统核心模块:
- NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,一般都是使用两个实现HA高可用
- JournalNode:元数据信息管理进程,一般都是奇数个
- DataNode:从节点,用于数据的存储
数据计算核心模块:
- ResourceManager:Yarn平台的主节点,主要用于接收各种任务,通过两个ResourceManager,构建成高可用
- NodeManager:Yarn平台的从节点,主要用于处理ResourceManager分配的任务
2.HDFS应用场景
适合场景:
- 存储非常大的文件,需要高吞吐量,对延时没有要求
- 采用流式数据的访问方式:一次写入,多次读取
- 高容错性
- 为数据存储提供所需的扩展能力
不适合场景:
- 低延时的数据访问
- 大量小文件的存储。文件的元数据保存在NameNode的内存中,,整个文件系统文件数量会受限于NameNode的内存大小
- 多方读写,HDFS采用追加(append-only)的方式写入数据。不支持文件任意offset的修改。不支持多个写入器(writer)
3.HDFS的结构和组成
HDFS是主从结构,由四部分组成:
client:客户端
- 文件切分,文件上传HDFS时候client将文件切分成一个个Block,然后进行存储
- 与NameNode进行交互,获取文件信息
- 与DataNode交互,读取或者写入数据
NameNode:就是 master,它是一个主管、管理者
- 管理 HDFS 的名称空间
- 管理数据块(Block)映射信息
- 配置副本策略
- 处理客户端读写请求
DataNode:就是Slave。NameNode 下达命令,DataNode 执行实际的操作
- 存储实际的数据块
- 执行数据块的读/写操作
Secondary NameNode:并非 NameNode 的热备。当NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务。
- 辅助 NameNode,分担其工作量
- 定期合并 fsimage和fsedits,并推送给NameNode
- 在紧急情况下,可辅助恢复 NameNode
4.NameNode与DataNode
NameNode
- 对元数据信息进行管理,以及表中文件数据块的地址映射
- 文件操作:NameNode负责元数据的操作,DataNode负责处理文件内容的读写请求,数据流不经过NameNode,会询问NameNode跟哪个DataNode联系
- NameNode决定文件数据块存放到那些DataNode上,NameNode根据全局情况做出放置副本的决定
- 心跳机制:全权管理数据块的复制,周期性的接受心跳和块的状态报告信息
DataNode:
- 以数据块的形式存储HDFS文件
- Data Node响应HDFS客户端读写请求
- Data Node周期性向NameNode汇报心跳信息、数据块信息、缓存数据块信息
5.HDFS安全模式
安全模式是Hadoop的一种保护机制,用于保证集群中数据块的安全性,当集群启动的时候,会首先进入安全模式,会价差数据块的完整性。
加入我们设置副本个数是3,那么在datanode上应该有3个副本存在。加入只存在2个副本,那么比例就是2/3=0.666,而HDFS默认的副本率为0.999.因此系统会自动复制副本到其他dataNode,使得副本率不小于0.999。如果系统中有5个副本,超出了3个,系统也会删除多余的2个副本。
在安全模式状态下,文件系统只接受读取数据请求,不接受变更请求,在系统达到安全标准,HDFS自动离开安全模式。
二、HDFS的存储机制(读写流程)
写入过程:
- client发起文件上传请求,通过RPC(远程过程调用)与 NameNode 建立通讯,NameNode 检查目标文件是否已存在,,父目录是否存在,返回是否可以上传
- Client 请求第一个 block 该传输到哪些DataNode 服务器上
- NameNode 根据配置文件中指定的备份数量及机架感知原理进行文件分配, 返回可用的DataNode 的地址如: A, B, C
- Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline), A 收到请求会继续调用 B, 然后 B 调用 C, 将整个 pipeline 建立完成, 后逐级返回 client
- Client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存), 以packet 为单位(默认64K), A 收到一个 packet 就会传给 B, B 传给 C. A 每传一个 packet 会放入一个应答队列等待应答
- 数据被分割成一个个 packet 数据包在 pipeline 上依次传输, 在 pipeline 反方向上, 逐个发送 ack(命令正确应答), 最终由 pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client
- 当一个 block 传输完成之后, Client 再次请求 NameNode 上传第二个 block 到服务器
读取过程:
- Client向NameNode发起RPC请求,来确定请求文件block所在的位置;
- NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址; 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序。
- Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据;
- 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;
- 读取完一个 block 都会进行验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读。
- 最终读取来所有的 block 会合并成一个完整的最终文件
三、元数据辅助管理
1.Fsimage和Edits
edits:
- edits 存放了客户端最近一段时间的操作日志
- 客户端对 HDFS 进行写文件时会首先被记录在 edits 文件中
- edits 修改时元数据也会更新
fsimage:
- NameNode中关于元数据的镜像,称为检查点。fsimage存放了一份比较完整的元数据信息
- 因为 fsimage 是 NameNode 的完整的镜像, 如果每次都加载到内存,非常耗内存和CPU, 所以一般开始时对 NameNode 的操作都放在 edits 中
- fsimage 内容包含了 NameNode 管理下的所有 DataNode 文件及文件 block 及 block 所在的 DataNode 的元数据信息.
- 随着 edits 内容增大, 就需要在一定时间点和 fsimage 合并
2.SecondaryNameNode 工作机制:
- NameNode询问NameNode是否需要checkpoint。直接带回检查结果
- Secondary NameNode请求执checkpoint。
- SecondaryNameNode 从 NameNode 中获得 fsimage 和 editlog(通过http方式)
- Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
- 生成新的镜像文件fsimage.chkpoint
- 拷贝fsimage.chkpoint到namenode
- namenode将fsimage.chkpoint重新命名成fsimage
3.NameNode与SecondaryNameNode的区别和联系
区别:
- NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数
据块信息 - SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。
联系:
- SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编
辑日志(edits) - 在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode
恢复数据
4.NameNode挂了怎么办
- 一:将SecondartNameNode中数据拷贝到namenode存储数据的目录;
- 二:高可用的集群会有两个NameNode,一个挂掉,激活另一个
四、高可用机制
1.单点故障和脑裂
-
单点故障:一个程序或一个服务器由众多的模块组成,当其中一个出现故障使得整体不可用。解决方案:一主一备,同一时刻只有一个活跃的
-
脑裂:主备之间信息不通畅,导致或认为彼此都挂了或者都存活着,出现集群无主或者集群多主的情况。
2.问题解决
QJM/Qurom Journal Manager(强行翻译树日志管理),其基本原理就是用2N+1台JournalNode存储EditLog,每次写数据操作有>=N+1返回成功即认为该次写成功数据不会丢失
QJM如何避免脑裂的发生?如何保证主备之间的数据状态是同步的?
- zkfc监控NameNode软硬件的环境以及进程是否正常。两个NameNode的zkfc在zk集群中注册指定的节点,谁创建成功,谁对应的那台NameNode就是active,创建失败zkfc设置为监听节点。
- 如果active的NameNode出现了健康问题,断开了与zk的连接,之前创建的节点消失,触发监听,standby对应的zkfc会收到监听。
- 而此时standby的NameNode不会立即把自己切换为active状态,ssh(网络传输协议)到active的那台机器进行远程登录,进行补刀 kill -9 xxxx 。补刀后再把自己状态转成为active。
- 如果之前的active又修好了,启动之后首先去zk创建节点,但是节点已经存在了,将节点设置为监听节点并且把自己的状态设置为standby
- 客户端连接active nn 进行hdfs操作,首先把edtis log写入到jn集群中,共享操作日志。standby感知jn集群数据的变化,读取edits log重演操作记录,过半则认为成功, 使得元数据实时同步
3.高可用以及高可用组件
高可用是保证连续性的有效解决方案,一般由两个或两个以上的节点,分为活动节点(Active)及备用节点(Standby)。Active NameNode负责群集中的所有客户端操作,而Standby充当从服务器。Standby机器保持足够的状态以提供快速故障切换。DateNode需要确保同一时间有且只有一个NameNode能命令DataNode
高可用组件介绍:
FailoverController 主要包括三个组件: :
- HealthMonitor: 监控 NameNode 是否处于 unavailable 或 unhealthy 状态。
- ActiveStandbyElector: 监控 NameNode 在 ZK 中的状态。
- ZKFailoverController:订阅 HealthMonitor (健康监听器)ActiveStandbyElector(选举) 的事件,并管理 NameNode的状态,另外 zkfc 还负责解决脑裂问题
ZKFailoverController 主要职责:
- 健康监测:周期性的向它监控的 NameNode 发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么 zkfc 就会标记它处于一个不健康的状态
- 会话管理:如果 NN 是健康的,zkfc 就会在 zookeeper 中保持一个打开的会话,如果 NameNode 同时还是 Active 状态的,那么 zkfc 还会在 Zookeeper 中占有一个类型为短暂类型的 znode,当这个 NN 挂掉时,这个 znode 将会被删除,然后备用的NN 将会得到这把锁,升级为主 NN,同时标记状态为 Active
- 当宕机的 NN 新启动时,它会再次注册 zookeper,发现已经有 znode 锁了,便会自动变为 Standby 状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置 2 个 NN
- master 选举:通过在 zookeeper 中维持一个短暂类型的 znode,来实现抢占式的锁机制,从而判断那个 NameNode 为 Active 状态
- QJM/Qurom Journal Manager(强行翻译树日志管理),其基本原理就是用2N+1台JournalNode存储EditLog,每次写数据操作有>=N+1返回成功即分为该次写成功数据不会丢失了
client的事务性操作也对HA提供了支持:
- client首先把事务性的操作先记录在edits log中然后进行相关操作。如果操作返回成功,信息同步内存中。如果不成功,操作日志标记失败。就是要保证记录的日志与真实的数据变化是一致的。
- 元数据是放在内存中的,如果断电了,内存元数据消失,所以要定期持久化元数据信息,备份、快照。
- 为了保证上次持久化镜像文件之后和这次持久化之前出现问题,通过edits log记录发送的所有事务性操作。
在这种机制下,如果机器挂掉,如何恢复?
- 首先先恢复镜像文件
- 再按照日志中记录逐个重演一遍操作,直至数据恢复
五、联邦机制( Federation)
HDFS Federation是解决namenode内存瓶颈问题的水平横向扩展方案。
- Federation意味着在集群中将会有多个namenode/namespace。这些namenode之间是联合的,也就是说,他们之间相互独立且不需要互相协调,各自分工,管理自己的区域。
- 分布式的datanode被用作通用的数据块存储存储设备。每个datanode要向集群中所有的namenode注册,且周期性地向所有namenode发送心跳和块报告,并执行来自所有namenode的命令。
- 如果一个NameNode的内存过高,可以将大的文件目录移到另外一个NameNode上做管理.更重要的一点在于,这些NameNode是共享集
群中所有的DataNode的,它们还是在同一个集群内的。
总结:
- 多个NN共用一个集群里的存储资源,每个NN都可以单独对外提供服务。
- 每个NN都会定义一个存储池,有单独的id,每个DN都为所有存储池提供存储。
- DN会按照存储池id向其对应的NN汇报块信息,同时,DN会向所有NN汇报本地存储可用资源情况。
优点:
- 命名空间可伸缩性——联合添加命名空间水平扩展。DN也随着NN的加入而得到拓展。
- 性能——文件系统吞吐量不是受单个Namenode限制。添加更多的Namenode集群扩展文件系统读/写吞吐量。
- 隔离——隔离不同类型的程序,一定程度上控制资源的分配
六、MapReduce
MapReduce运行在yarn集群
- ResourceManager
- NodeManager
一个完整的mapreduce程序在分布式运行时有三类实例进程:
- MRAppMaster 负责整个程序的过程调度及状态协调
- MapTask 负责map阶段的整个数据处理流程
- ReduceTask 负责reduce阶段的整个数据处理流程
1.MapReduce编程规范
Map有两个阶段:
- 设置InputFormat类,将数据切分为Key-Value(K1和V1) 对, 输入到第二步
- 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果
Shuffle阶段四个步骤
- 对输出的 Key-Value 对进行分区
- 对不同分区的数据按照相同的 Key 排序
- (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
- 对数据进行分组, 相同 Key 的 Value 放入一个集合中
Reduce阶段两个步骤
- 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的
Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出 - 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
2.MapReduce流程图:
3.MapReduce排序和序列化
- 序列化是指把结构化对象转化为字节流
- 反序列化 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息,不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
定义Bean对象想要序列化传输的注意事项:
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用
(7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序
FileInputFormat切片机制
- 简单的按照文件的内容长度进行切片
- 切片大小默认等于block块大小
- 切片时不考虑数据的整体性,而是针对每一个文件单独切片
自定义InputFormat流程
- 自定义一个类继承FileInputFormat
- 重写RecordReader方法
4.规约Combiner作用、使用场景、与reduce区别
Combiner是 MapReduce的一种优化手段之一。对每个MapTask的输出进行局部汇总,以减少网络传输量
Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer
的输入kv类型要对应起来
- combiner组件的父类就是Reducer
- combiner 和 reducer 的区别在于运行的位置
- Combiner 是在每一个 maptask 所在的节点运行
- Reducer 是接收全局所有 Mapper 的输出结果
5.MapTask工作机制
(1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
-
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序
-
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
-
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,
则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
6.ReduceTask工作机制
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果
其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上
的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一
组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已
经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序
即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
7.请描述mapReduce有几种排序及排序发生的阶段
1)排序的分类:
- (1)部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。 - (2)全排序:
如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。 - (3)辅助排序:(GroupingComparator分组)
Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。 - (4)二次排序:
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
2)自定义排序WritableComparable,bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
3)排序发生的阶段:
(1)一个是在map side发生在spill后partition前。
(2)一个是在reduce side发生在copy后 reduce前
8.请描述mapReduce中shuffle阶段的工作流程,如何优化shuffle阶段
分区,排序,溢写,拷贝到对应reduce机器上,增加combiner,压缩溢写的文件
9.Mapreduce的工作原理,请举例子说明mapreduce是怎么运行的
10.如果没有定义partitioner,那数据在被送达reducer前是如何被分区的
如果没有自定义的 partitioning,则默认的 partition 算法,即根据每一条数据的 key
的 hashcode 值摸运算(%)reduce 的数量,得到的数字就是“分区号”。
11.如何决定一个job的map和reduce的数量
1)map数量
splitSize=max{minSize,min{maxSize,blockSize}}
map数量由处理的数据分成的block数量决定default_num = total_size / split_size;
2)reduce数量
reduce的数量job.setNumReduceTasks(x);x 为reduce的数量。不设置的话默认为 1
12.Maptask的个数由什么决定?
一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。
13.MapReduce 怎么实现TopN?
可以自定义groupingcomparator,或者在map端对数据进行排序,然后再reduce输出时,控制只输出前n个数。就达到了topn输出的目的。
14.有可能使 Hadoop 任务输出到多个目录中么?如果可以,怎么做
采用自定义OutputFormat
自定义outputformat
改写recordwriter,具体改写输出数据的方法write()