hadoop
1.hadoop的组成
- common公共组件
- mapreduce负责业务逻辑运算
- yarn负责资源的调度
- HDFS负责文件的存储
1.1.HDFS
- NameNode:储存文件的元数据,如文件名,文件的位置等等
- DataNode:本地系统存储文件块数据
- 2nn:每隔一段时间存储NameNode
1.2YARN
- ResourceManager:整个集群的老大,可以进行资源调度
- NodeManager:单个节点的管理者
- ApplactionManager:单个任务的老大
- Container:是一个容器,相当于一个独立的计算机,单个任务跑在Container中,每一个NodeManager可以有多个Container
- 客户端可以有多个
2.hdfs详情
1.hdfs的写入流程
- 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
- NameNode返回可以上传
- 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
- NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3.
- 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用
dn2,然后 dn2 调用 dn3,将这个通信管道建立完成 - dn1、dn2、dn3 逐级应答客户端。
- 客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),
以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet
会放入一个应答队列等待应答 - 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务
器。(重复执行 3-7 步)。
2.hdfs的读取流程
- 客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查
询元数据,找到文件块所在的 DataNode 地址 - 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
- DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位
来做校验) - 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
tips:
hadoop在物理上数据是分块处理的也就是block,3版本后就是128M;
Packet是client端向datanode,或者datanode之间传输的基本单位默认64kb
元数据:Data about Data,是描述数据的数据,主要储存文件属主,大小,权限,时间戳位置等信息。
3.namenode的工作机制
-
一阶段NameNode启动
- 第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
- 客户端对元数据进行增删改的请求。
- NameNode 记录操作日志,更新滚动日志。
- NameNode 在内存中对元数据进行增删改。
-
二阶段:Secondary NameNode 工作
-
Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode
是否检查结果。 -
Secondary NameNode 请求执行 CheckPoint
-
NameNode 滚动正在写的 Edits 日志。
-
将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
-
Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
-
生成新的镜像文件 fsimage.chkpoint。
-
拷贝 fsimage.chkpoint 到 NameNode。
-
NameNode 将 fsimage.chkpoint 重新命名成 fsimage。
-
4.datanode工作机制
- 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据
本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。 - DataNode 启动后向 NameNode 注册,通过后,周期性(6 小时)的向 NameNode 上
报所有的块信息 - 心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块
数据到另一台机器,或删除某个数据块。如果超过 10 分钟没有收到某个 DataNode 的心跳,
则认为该节点不可用。 - 集群运行中可以安全加入和退出一些机器
5.hdfs常用命令
1.从本地上传到hdfs
hadoop fs -put <localPath> <hdfsPath>
2.从hdfs下载到本地
hadoop fs -get <hdfsPath> <localPath>
3.显示目录信息
hadoop fs -ls <path>
4.显示文件内容
hadoop fs -cat <path>
5.创建路径
hadoop fs -mkdir <path>
6.复制
hadoop fs -cp <path> <targetPath>
7.移动
hadoop fs -mv <path> <targetPath>
8.删除
hadoop fs -rm -r <path>
9.统计文件夹的大小信息
hadoop fs -du -s -h <path>
10设置hdfs的文件副本数量
hadoop fs -setrep <amount> <path> //不能超过节点数
3.MapReduce详情
1.MapReduce的核心思想
- 分布式的运算程序往往需要分成至少 2 个阶段。
- 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
- 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段
的所有 MapTask 并发实例的输出。 - MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业
务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
2.数据切片与MapTask的并行机制
2.1源码解析
-
)程序先找到你数据存储的目录。
-
)开始遍历处理(规划切片)目录下的每一个文件
-
)遍历第一个文件
- 获取文件大小fs.sizeOf(ss.txt)
- 计算切片大小 computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
- 默认情况下,切片大小=blocksize
- 开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M (每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
- 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在getSplit()方法中完成
- nputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
-
)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
2.2FileInputFormat切片机制
2.2.1: FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等,针对不同的数据类型用不同的实现类。
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
2.3TextInputFormat
- TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止
符(换行符和回车符),Text 类型。 - 按照文件切片,小文件太多的话效率极其低下。
2.4CombineTextInputFormat
- CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到
一个切片中,这样,多个小文件就可以交给一个 MapTask 处理 - 虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m - 切片流程:将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不
大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,
那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时
将文件均分成 2 个虚拟存储块(防止出现太小切片)。
2.5Partition 分区
-
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个 key存储到哪个分区
-
可以自定义分区
-
)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 控制分区代码逻辑 … … return partition; } }
-
在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
-
自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
-
2.6WritableComparable 排序
-
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是 否需要。
-
自定义排序原理分析
-
bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可
以实现排序。@Override public int compareTo(FlowBean bean) { int result; // 按照总流量大小,倒序排列 if (this.sumFlow > bean.getSumFlow()) { result = -1; }else if (this.sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }
-
2.7Combiner 合并
- Combiner是MR程序中Mapper和Reducer之外的一种组件。
- Combiner组件的父类就是Reducer。
- Combiner和Reducer的区别在于运行的位置 Combiner是在每一个MapTask所在的节点运行;
- Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
- Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。
4.Yarn
4.1Yarn组成
4.1.1ResourceManager
- 处理客户请求
- 监控NodeManager
- 启动或者监控ApplicationManager
- 资源的分配和调度
4.1.2NodeManager
- 管理单个节点上的资源
- 处理来自ResourceManager的命令
- 处理来自ApplicationMaster的命令
4.1.3ApplicationMaster
- 为应用程序申请资源并分配给内部任务
- 任务的监控和容错
4.1.4Container
- Container是yarn中的资源抽象,它封装了某个节点上的多维度资源,如内存、cpu、磁盘、网络等
4.2Yarn的工作机制
(1)MR 程序提交到客户端所在的节点。
(2)YarnRunner 向 ResourceManager 申请一个 Application。
(3)RM 将该应用程序的资源路径返回给 YarnRunner。
(4)该程序将运行所需资源提交到 HDFS 上。
(5)程序资源提交完毕后,申请运行 mrAppMaster。
(6)RM 将用户的请求初始化成一个 Task。
(7)其中一个 NodeManager 领取到 Task 任务。
(8)该 NodeManager 创建容器 Container,并产生 MRAppmaster
(9)Container 从 HDFS 上拷贝资源到本地。
(10)MRAppmaster 向 RM 申请运行 MapTask 资源。
(11)RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分
别领取任务并创建容器。
(12)MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager
分别启动 MapTask,MapTask 对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask 向 MapTask 获取相应分区的数据。
(15)程序运行完毕后,MR 会向 RM 申请注销自己
4.3Yarn常用命令
1.列出所有Application
- yarn application -list
2.根据 Application 状态过滤:yarn application -list -appStates (所有状态:ALL、NEW、
NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED)
yarn application -list -appStates <ALL>
3.kill掉application
yarn application -kill <application-id>
4.查询application日志
yarn logs -application <Application-id>
5.查询container日志
yarn logs -application <Application-id> -containerId <Container-id>
6.查看所有application尝试运行的任务
yarn applicationattempt -list <ApplicationId>
7.打印 ApplicationAttemp 状态
yarn applicationattempt -status <ApplicationAttemptId>
8.列出所有 Container
yarn container -list <ApplicationAttemptId>
9.打印 Container 状态
yarn container -status <ContainerId>
10.列出所有节点
yarn node -list -all
11.加载队列配置
yarn rmadmin -refreshQueues
- 打印队列信息
yarn queue -status <QueueName>