hadoop

hadoop

1.hadoop的组成

  • common公共组件
  • mapreduce负责业务逻辑运算
  • yarn负责资源的调度
  • HDFS负责文件的存储
1.1.HDFS
  • NameNode:储存文件的元数据,如文件名,文件的位置等等
  • DataNode:本地系统存储文件块数据
  • 2nn:每隔一段时间存储NameNode
1.2YARN
  • ResourceManager:整个集群的老大,可以进行资源调度
  • NodeManager:单个节点的管理者
  • ApplactionManager:单个任务的老大
  • Container:是一个容器,相当于一个独立的计算机,单个任务跑在Container中,每一个NodeManager可以有多个Container
  • 客户端可以有多个

2.hdfs详情

1.hdfs的写入流程
  • 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
  • NameNode返回可以上传
  • 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
  • NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3.
  • 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用
    dn2,然后 dn2 调用 dn3,将这个通信管道建立完成
  • dn1、dn2、dn3 逐级应答客户端。
  • 客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),
    以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet
    会放入一个应答队列等待应答
  • 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务
    器。(重复执行 3-7 步)。
2.hdfs的读取流程
  • 客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查
    询元数据,找到文件块所在的 DataNode 地址
  • 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
  • DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位
    来做校验)
  • 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
tips:
hadoop在物理上数据是分块处理的也就是block,3版本后就是128M;
Packet是client端向datanode,或者datanode之间传输的基本单位默认64kb
元数据:Data about Data,是描述数据的数据,主要储存文件属主,大小,权限,时间戳位置等信息。
3.namenode的工作机制
  • 一阶段NameNode启动

    • 第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
    • 客户端对元数据进行增删改的请求。
    • NameNode 记录操作日志,更新滚动日志。
    • NameNode 在内存中对元数据进行增删改。
  • 二阶段:Secondary NameNode 工作

    • Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode
      是否检查结果。

    • Secondary NameNode 请求执行 CheckPoint

    • NameNode 滚动正在写的 Edits 日志。

    • 将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。

    • Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。

    • 生成新的镜像文件 fsimage.chkpoint。

    • 拷贝 fsimage.chkpoint 到 NameNode。

    • NameNode 将 fsimage.chkpoint 重新命名成 fsimage。

4.datanode工作机制
  • 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据
    本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
  • DataNode 启动后向 NameNode 注册,通过后,周期性(6 小时)的向 NameNode 上
    报所有的块信息
  • 心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块
    数据到另一台机器,或删除某个数据块。如果超过 10 分钟没有收到某个 DataNode 的心跳,
    则认为该节点不可用。
  • 集群运行中可以安全加入和退出一些机器
5.hdfs常用命令

1.从本地上传到hdfs

hadoop fs -put <localPath> <hdfsPath> 

2.从hdfs下载到本地

hadoop fs -get <hdfsPath> <localPath>

3.显示目录信息

hadoop fs -ls <path>

4.显示文件内容

hadoop fs -cat <path>

5.创建路径

hadoop fs -mkdir <path>

6.复制

hadoop fs -cp <path> <targetPath>

7.移动

hadoop fs -mv <path> <targetPath>

8.删除

hadoop fs -rm -r <path>

9.统计文件夹的大小信息

 hadoop fs -du -s -h <path>

10设置hdfs的文件副本数量

hadoop fs -setrep <amount> <path> //不能超过节点数

3.MapReduce详情

1.MapReduce的核心思想
  • 分布式的运算程序往往需要分成至少 2 个阶段。
  • 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
  • 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段
    的所有 MapTask 并发实例的输出。
  • MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业
    务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
2.数据切片与MapTask的并行机制
2.1源码解析
  • )程序先找到你数据存储的目录。

  • )开始遍历处理(规划切片)目录下的每一个文件

  • )遍历第一个文件

    • 获取文件大小fs.sizeOf(ss.txt)
    • 计算切片大小 computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
    • 默认情况下,切片大小=blocksize
    • 开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M (每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
    • 将切片信息写到一个切片规划文件中
    • 整个切片的核心过程在getSplit()方法中完成
    • nputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
  • )提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。

2.2FileInputFormat切片机制

2.2.1: FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等,针对不同的数据类型用不同的实现类。

  • 简单地按照文件的内容长度进行切片
  • 切片大小,默认等于Block大小
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
2.3TextInputFormat
  • TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止
    符(换行符和回车符),Text 类型。
  • 按照文件切片,小文件太多的话效率极其低下。
2.4CombineTextInputFormat
  • CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到
    一个切片中,这样,多个小文件就可以交给一个 MapTask 处理
  • 虚拟存储切片最大值设置
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
  • 切片流程:将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不
    大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,
    那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时
    将文件均分成 2 个虚拟存储块(防止出现太小切片)。
2.5Partition 分区
  • 默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个 key存储到哪个分区

  • 可以自定义分区

    • )自定义类继承Partitioner,重写getPartition()方法

      public class CustomPartitioner extends Partitioner<Text, FlowBean> {
      @Override
      public int getPartition(Text key, FlowBean value, int numPartitions) { // 控制分区代码逻辑 … …
      return partition;
      }
      }
      
    • 在Job驱动中,设置自定义Partitioner

      job.setPartitionerClass(CustomPartitioner.class);
      
      
    • 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

      job.setNumReduceTasks(5);
       
      
2.6WritableComparable 排序
  • MapTask和ReduceTask均会对数据按照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是 否需要。

  • 自定义排序原理分析

    • bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可
      以实现排序。

      @Override public int compareTo(FlowBean bean) { 
       
       int result;     // 按照总流量大小,倒序排列  
          if (this.sumFlow > bean.getSumFlow()) { 
              result = -1; 
          }else if (this.sumFlow < bean.getSumFlow()) {  
              result = 1;  
          }else { 
              result = 0; 
          } 
       
       return result;
      }
      
2.7Combiner 合并
  • Combiner是MR程序中Mapper和Reducer之外的一种组件。
  • Combiner组件的父类就是Reducer。
  • Combiner和Reducer的区别在于运行的位置 Combiner是在每一个MapTask所在的节点运行;
  • Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。

4.Yarn

4.1Yarn组成
4.1.1ResourceManager
  • 处理客户请求
  • 监控NodeManager
  • 启动或者监控ApplicationManager
  • 资源的分配和调度
4.1.2NodeManager
  • 管理单个节点上的资源
  • 处理来自ResourceManager的命令
  • 处理来自ApplicationMaster的命令
4.1.3ApplicationMaster
  • 为应用程序申请资源并分配给内部任务
  • 任务的监控和容错
4.1.4Container
  • Container是yarn中的资源抽象,它封装了某个节点上的多维度资源,如内存、cpu、磁盘、网络等
4.2Yarn的工作机制

(1)MR 程序提交到客户端所在的节点。
(2)YarnRunner 向 ResourceManager 申请一个 Application。
(3)RM 将该应用程序的资源路径返回给 YarnRunner。
(4)该程序将运行所需资源提交到 HDFS 上。
(5)程序资源提交完毕后,申请运行 mrAppMaster。
(6)RM 将用户的请求初始化成一个 Task。
(7)其中一个 NodeManager 领取到 Task 任务。
(8)该 NodeManager 创建容器 Container,并产生 MRAppmaster

(9)Container 从 HDFS 上拷贝资源到本地。
(10)MRAppmaster 向 RM 申请运行 MapTask 资源。
(11)RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分
别领取任务并创建容器。
(12)MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager
分别启动 MapTask,MapTask 对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask 向 MapTask 获取相应分区的数据。
(15)程序运行完毕后,MR 会向 RM 申请注销自己

4.3Yarn常用命令

1.列出所有Application

- yarn application -list 

2.根据 Application 状态过滤:yarn application -list -appStates (所有状态:ALL、NEW、
NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED)

yarn application -list -appStates <ALL>

3.kill掉application

yarn application -kill <application-id>

4.查询application日志

yarn logs -application <Application-id>

5.查询container日志

yarn logs -application <Application-id> -containerId <Container-id>

6.查看所有application尝试运行的任务

yarn applicationattempt -list <ApplicationId> 

7.打印 ApplicationAttemp 状态

yarn applicationattempt -status <ApplicationAttemptId>

8.列出所有 Container

yarn container -list <ApplicationAttemptId> 

9.打印 Container 状态

 yarn container -status <ContainerId> 

10.列出所有节点

yarn node -list -all

11.加载队列配置

yarn rmadmin -refreshQueues 
  1. 打印队列信息
yarn queue -status <QueueName> 
上一篇:HDFS读数据流程


下一篇:十四、hadoop高可用