39 Hadoop学习总结

HDFS相关

HDFS写数据的流程

  • 首先由客户端向NameNode服务发起写数据请求
  • NameNode收到请求后会进行基本验证
    • 验证类容包括对请求上传的路径进行合法验证
    • 对请求的用户进行权限验证
  • 验证没有问题后,NameNode会响应客户端允许上传
  • 接下来客户端会对文件按照blocksize大小进行切块,切完后依次以块为单位上传
  • 此时客户端会请求上传第一个块信息
  • 服务端接到上传请求后会依据HDFS默认机架感知原理,返回3台存放数据块副本的DataNode机器
  • 客户端收到机器列表后会依据网络拓扑原理找到其中一台机器进行传输通道的建立
  • 然后依次和三台机器进行串行连接
  • 这样的连接主要是为了减轻客户端本地IO的压力
  • 当通道建立成功后,客户端会通过HDFS的FSOutputStream流对象进行数据传输
  • 数据传输的最小单位为packet
  • 传输过程中每台DataNode服务器串行连接,依次将数据传递
  • 最后一个数据块被传输完成后相当于一次写入结束,如果还有数据块要传输,那就接着传输第二个数据块

HDFS读数据的流程

  • 和写数据一样,由客户端向NameNode发出请求
  • NameNode收到请求后会进行文件下载路径的合法性以及权限验证
  • 如果验证没问题,就会给客户端返回目标文件的元数据信息
  • 信息中包含目标文件数据块对应的DataNode的位置信息
  • 然后客户端根据具体的DataNode位置信息结合就近原则网络拓扑原理找到离自己最近的一台服务器对数据进行访问和下载
  • 最后通过HDFS提供的FSInputStream对象将数据读取到本地
  • 如果有多个块信息就会请求多次DataNode直到目标文件的全部数据被下载

HDFS的架构及每个服务的作用

  • HDFS是Hadoop架构中负责完成数据分布式存储管理的文件系统
  • 非高可用集群工作时会启动三个服务,分别是NameNode、DataNode以及SecondaryNameNode
  • 其中NameNode是HDFS的中心服务,主要维护管理文件系统中的文件的元数据信息
  • DataNode主要负责存储文件的真实数据块信息
  • DataNode的数据块信息中也包含一些关于当前数据块的元数据信息,如检验值,数据长度,时间戳等
  • 在非高可用HDFS集群中,NameNode和DataNode可以理解为是一对多的关系
  • 二者在集群中也要保存通信,通常默认3秒钟会检测一下心跳
  • 最后SecondaryNameNode的工作很单一,就是为了给NameNode的元数据映像文件和编辑日志进行合并,并自己也保留一份元数据信息,以防NameNode元数据丢失后有恢复的保障

HDFS中如何实现元数据的维护

  • NameNode的元数据信息是通过fsimage文件 + edits编辑日志来维护的
  • 当NameNode启动的时候fsimage文件和edits编辑日志的内容会被加载到内存中进行合并形成最新的元数据信息
  • 当我们对元数据进行操作的时候,考虑到直接修改文件的低效性,而不会直接修改fsimage文件
  • 而是会往edits编辑日志文件中追加操作记录
  • 当满足一定条件时,会让Secondary NameNode来完成fsimage文件和edits编辑日志文件的合并
  • Secondary NameNode首先会让NameNode停止对正在使用的edits编辑日志文件的使用,并重新生成一个新的edits编辑日志文件
  • 接着把NameNode的fsimage文件和已停止的edits文件拷贝到本地
  • 在内存中将edits编辑日志文件的操作记录合并到fsimage文件中形成一个最新的fsimage文件
  • 最后会将这个最新的fsimage文件推送给NameNode并自己也备份一份

NN和DN的关系,以及DN的工作流程

  • 从数据结构上看,就是一对多的关系
  • 一个HDFS集群中只能有一个NameNode用于维护元数据信息,同时会有多个DataNode用于存储真实的数据块
  • 当HDFS集群启动的时候,会首先进入到安全模式下
  • 在安全模式下我们只能对数据进行读取不能进行任何写操作
  • 此时集群的每一台DataNode会向NameNode注册自己
  • 注册成功后DataNode会上报自己的数据块详细信息
  • 当数据块汇报满足最小副本条件后,会自动退出安全模式
  • 此后DataNode和NameNode每三秒会通信一次,如果NameNode检测到DataNode没有响应,会继续检测
  • 一直到10分30秒后还没有检测到,就确定当前的DataNode不可用

MapReduce相关

手写MR的大概流程和规范

  • MR程序的结构可以分为3部分,一是程序的执行入口,通常简称为驱动类
  • 驱动类主要编写MR作业的提交流程以及自定义的一些配置项
  • 二是Map阶段核心类,需要自定义并继承Mappper类,重写Mapper中的map方法
  • 在map方法中编写自己的业务逻辑代码将数据处理后利用context上下文对象的写出落盘
  • 三是Reduce阶段的核心类,同时也需要继承Hadoop提供的Reducer类并重写reduce方法
  • 在reduce方法中编写自己的业务逻辑代码,处理完数据后通过context上下文对象将数据写出,这也就是最终的结果文件

如何实现Hadoop的序列化,Hadoop的序列化和Java的序列化有什么区别

  • 首先,序列化是把内存中的Java对象转化成二进制字节码,反序列化是将二进制字节码转化成Java对象
  • 通常我们在对Java对象进行磁盘持久化写入或将Java对象作为数据进行网络传输的时候需要进行序列化
  • 相反如果要将数据从磁盘读出并转化成Java对象需要进行反序列化
  • 实现Hadoop中的序列化需要让JavaBean对象实现Writable接口,并重写wirte()方法和readFields()方法
  • 其中wirte()是序列化方法,readFields()方法是反序列化方法
  • Hadoop序列化和Java序列化的区别在于,java序列化更重量级
  • Java序列化后的结果不仅仅生成二进制字节码文件,同时还会针对当前Java对象生成对应的检验信息以及集成体系结构
  • 这样的话,无形中我们需要维护更多的数据
  • 但是Hadoop序列化不会产生除了Java对象内部属性外的任何信息,整体内容更加简洁紧凑,读写速度相应也会提升很多,这也符合大数据的处理背景

MR程序的执行流程

  • MR程序执行先从InputFormat类说起,由InputFormat负责数据读入,并在内部实现切片
  • 每个切片的数据对应生成一个MapTask任务
  • MapTask中按照文件的行逐行数据进行处理,每一行数据会调用一次我们自定义的Mapper类的map方法
  • map方法内部实现具体的业务逻辑,处理完数据会通过context对象将数据写出到磁盘,接下来ReduceTask会开始执行
  • 首先ReduceTask会将MapTask处理完的数据结果拷贝过来
  • 每组相同key的values会调用一次我们自定义Reducer类的reduce方法
  • 当数据处理完成 后,会通过context对象将数据结果写出到磁盘上

InputFormat负责数据写份时候要进行切片,为什么切片大小默认是128M

  • 首先切片大小是可以通过修改配置参数来改变的,但默认情况下是和切块blocksize大小一致
  • 这样做的目的就是为了在读取数据的时候正好能一次性读取一个块的数据,避免了在集群环境下发生跨机器读取的情况
  • 如果跨机器读取会造成额外的网络IO,不利于MR程序执行效率的提升

描述一下切片的逻辑

  • MR中的切片是发生在数据读入的阶段中,所以我们要关注InputFormat的实现
  • 通过追溯源码,在InputFormat这个抽象类中有一个getSplits(),这个方法就是实现切片的具体逻辑
  • 首先关注两个变量,分别是minSize和maxSize,默认情况minSize = 1,maxSize = Long.MAX_VALUE
  • 源码中声明了一个集合List splits = new ArrayList(),用于装载将来的切片对象并返回
  • 接下来根据提交的job信息获取到当前要进行切片的文件详情
  • 首先判断当前文件是否可以进行切分,这一步主要考虑到一些不支持切分的压缩文件不能进行切片操作,否则就破坏了数据的完整性
  • 如果当前文件可以切片的话,就要计算切片的大小
  • 切片的大小一共需要三个因子,分别是minSize、maxSize、blocksize
  • 最后通过Math.max(minSize,Math.min(maxSize,blocksize)),计算逻辑获取到切片的大小
  • 默认情况下切片大小和数据块大小一致
  • 如果想要改变切片的大小可以通过修改mapreduce.input.fileinputformat.split.minsize(把切片调大)、mapreduce.input.fileinputformat.split.maxsize(把切片调小)两个参数实现
  • 获取到切片大小后继续往下执行,在最终完成切片之前还有一个关键判断
  • 就是判断剩余文件是否要进行切片

CombineTextInputFormat机制是怎么实现的

  • CombineTextInoutFormat是InputFormat的一个实现类,主要用于解决小文件场景
  • 大概思路是先在Job提交中指定使用InputFormat的实现类为CombineTextInputFormat
  • 接下来的切片过程中会先把当前文件的大小和设置的切片的最大值进行比较
  • 如果小于最大值,就单独划分成一块
  • 如果大于切片的最大值并小于两倍的切片的最大值,就把当前文件一分为二划分成两块
  • 以此类推逐个对文件进行处理,这个过程称之为虚拟过程
  • 最后生成真正的切片的时候,根据虚拟好的文件进行合并
  • 只要合并后文件大小不超过最开始设置好的切片的最大值那就继续追加合并
  • 直到达到设置好的切片的最大值
  • 此时就会产生一个切片,对应生成一个MapTask

Shuffle机制流程

  • 当MapTask执行完map()方法后通过context对象写数据的时候开始执行shuffle过程
  • 首先数据先从map端写入到环形缓冲区内
  • 写出的数据会根据分区规则进入到指定的分区,并且同时在内存中进行区内排序
  • 环形缓冲区默认大小为100M
  • 当数据写入的容量达到缓冲区大小的80%,数据开始向磁盘溢写
  • 如果数据很多的情况下,可能发生N次溢写
  • 这样在磁盘上就会产生多个溢写文件,并保证每个溢写文件中区内是有序的
  • 到此shuffle过程在Map端就完成了
  • 接着Map端输出的数据会作为Reduce端的数数据再次进行汇总操作
  • 此时ReduceTask任务会把每一个MapTask中计算完的相同的分区的数据拷贝到ReduceTask的内存中,如果内存放不下,开始写入磁盘
  • 再接着就是对数据进行归并排序,排序完还要根据相同的key进行分组
  • 将来一组相同的key对应的values调用一次reduce方法,如果有多个分区就会产生多个ReduceTask来处理,处理的逻辑都一样

MR程序中由谁来决定分区的数量,哪个阶段环节会开始往分区中写数据

  • 在Job提交的时候可以设置ReduceTask的数量
  • ReduceTask的数量决定分区的编号
  • 默认有多少ReduceTask任务就会产生多少个分区
  • 在Map阶段的map方法中通过context.wirte()往外写数据的时候其实就是在往指定的分区中写数据了

阐述MR中实现分区的思路

  • 默认情况下不指定分区数量就会有一个分区
  • 如果要指定分区,可以通过在Job提交的时候指定ReduceTask的数量来指定分区的数量
  • 从Map端处理完数据后,数据就会被溢写到指定的分区中
  • 决定kv数据究竟写到哪个分区中是通过Hadoop提供的Partitioner对象控制的
  • Partitioner对象默认实现HashPartitioner类
  • 它的规则就是用当前写出数据的key和ReduceTask的数量做取余操作,得到的结果就是当前数据要写入的分区的编号
  • 除此之外,我们也可以自定义分区器对象
  • 需要继承Hadoop提供的Partitioner对象,然后重写getPartitioner()方法
  • 在该方法中根据自己的业务实现分区编号的返回
  • 最后再将我们自定义的分区器对象设置到Job提交的代码中覆盖默认的分区规则

Hadoop中实现排序的两种方案分别是什么

  • 第一种方式是直接让参与比较的对象实现WritableComparable接口并指定泛型
  • 接下来实现CompareTo()方法,在该方法中实现比较规则即可
  • 第二种方式是自定义比较器对象,需要继承WritableComparator类,重写它的compare方法
  • 在构造器中调用父类对当前的要参与比较的对象进行实例化
  • 当前要参与比较的对象必须要实现WritableComparable接口
  • 最后在Job提交代码中将自定义的比较器对象设置到Job中就可以了

编写MR的时候什么情况下使用Combiner,实现的具体流程是什么

  • Combiner在MR中是一个可选流程,通常也是一种优化手段
  • 当我们执行完Map阶段的计算后数据量比较大,kv组合过多
  • 这样在Reduce阶段执行的时候会造成拷贝大量的数据以及汇总更多的数据
  • 为了减轻Reduce的压力,此时可以选择在Map阶段进行Combiner操作,将一些汇总工作提前进行

OutputFormat自定义实现流程

  • OutputFormat是MR中最后一个流程,它主要负责数据最终结果的写出
  • 如果对最终输出结果文件的名称或者输出路径有个性化需求,就可以通过自定义OutputFormat来实现
  • 首先自定义一个OutputFormat类,然后继承OutputFormat
  • 重写OutputFormat的getRecordWriter()方法,在该方法中返回RecordWriter对象
  • 由于RecordWriter是Hadoop内部对象,如果我们想实现自己的逻辑,还得自定义一个RecordWriter类,然后继承RecordWriter类
  • 重写该类中的write()方法和close()方法

MR实现MapJoin的思路,MapJoin的局限性是什么

  • Mapjoin解决了数据倾斜给Reduce阶段带来的问题
  • 首先MapJoin的前提就是我们需要join的两个文件一个是大文件,一个是小文件
  • 在此前提下,我们可以将小的文件提前缓存到内存中,然后让Map端直接处理大文件
  • 每处理一行数据就根据当前的关联字段到内存中获取想要的数据,然后将结果写出
上一篇:【题解】CF1479E School Clubs


下一篇:黄帝内经-第39篇-举痛论篇(1)