hdfs写入流程

一、简介

  HDFS(Hadoop Distributed File System)是GFS的开源实现。

1.1.优点:

  能够运行在廉价机器上,硬件出错常态,需要具备高容错性
  流式数据访问,而不是随机读写
  面向大规模数据集,能够进行批处理、能够横向扩展
  简单一致性模型,假定文件是一次写入、多次读取

1.2.缺点:

  不支持低延迟数据访问
  不适合大量小文件存储(因为每条元数据占用空间是一定的)
  不支持并发写入,一个文件只能有一个写入者
  不支持文件随机修改,仅支持追加写入

1.3.数据单位:

  block :文件上传前需要分块,这个块就是block,一般为128MB,可以修改,不过不推荐,因为块太小:寻址时间占比过高。块太大:Map任务数太少,作业执行速度变慢。它是最大的一个单位。

  packet :packet是第二大的单位,它是client端向DataNode,或DataNode的PipLine之间传数据的基本单位,默认64KB。

  chunk :chunk是最小的单位,它是client向DataNode,或DataNode的PipLine之间进行数据校验的基本单位,默认512Byte,因为用作校验,故每个chunk需要带有4Byte的校验位。所以实际每个chunk写入packet的大小为516Byte。由此可见真实数据与校验值数据的比值 约为128 : 1(即64*1024 / 512)。

二、HDFS写入流程

hdfs写入流程

hdfs写入的流程图二

hdfs写入流程

写入本地file文件,假设文件200M,则共有2个块,block1为128M(hdfs默认块大小为128M),block2为72M。默认三个副本。

      1、ClientNode向HDFS写入数据,先调用DistributedFileSystem的 create 方法获取FSDataOutputStream。

      2、DistributedFileSystem调用NameNode的 create 方法,发出文件创建请求。NameNode对待上传文件名称和路径做检验,如上传文件是否已存在同名目录,文件是否已经存在,递归创建文件的父目录(如不存在)等。并将操作记录在edits文件中。

      3、ClientNode调用FSDataOutputStream向输出流输出数据(假设先写block1)。

      4、FSDataOutputStream调用NameNode的 addBlock 方法申请block1的blockId和block要存储在哪几个DataNode(假设DataNode1,DataNode2和DataNode3)。若pipeline还没有建立,则根据位置信息建立pipeline。

       5、同返回的第一个DataNode节点DataNode1建立socket连接,向其发送package。同时,此package会保存一份到ackqueue确认队列中。写数据时先将数据写到一个校验块chunk中,写满512字节,对chunk计算校验和checksum值(4字节)。以带校验和的checksum为单位向本地缓存输出数据(本地缓存占9个chunk),本地缓存满了向package输入数据,一个package占64kb。当package写满后,将package写入dataqueue数据队列中。将package从dataqueue数据对列中取出,沿pipeline发送到DataNode1,DataNode1保存,然后将package发送到DataNode2,DataNode2保存,再向DataNode3发送package。DataNode3接收到package,然后保存。

        6、package到达DataNode3后做校验,将校验结果逆着pipeline回传给ClientNode。DataNode3将校验结果传给DataNode2,DataNode2做校验后将校验结果传给DataNode1,DataNode1做校验后将校验结果传给ClientNode。ClientNode根据校验结果判断,如果”成功“,则将ackqueue确认队列中的package删除;如果”失败“,则将ackqueue确认队列中的package取出,重新放入到dataqueue数据队列末尾,等待重新沿pipeline发送。

        7、当block1的所有package发送完毕。即DataNode1、DataNode2和DataNode3都存在block1的完整副本,则三个DataNode分别调用NameNode的 blockReceivedAndDeleted方法。NameNode会更新内存中DataNode和block的关系。ClientNode关闭同DataNode建立的pipeline。文件仍存在未发送的block2,则继续执行4。直到文件所有数据传输完成。

        8、全部数据输出完成,调用FSDataOutputStream的 close 方法。

        9、ClientNode调用NameNode的 complete 方法,通知NameNode全部数据输出完成。

如果还不懂,可以结合这张图从源码角度去看下整个写入流程(spark3.0的)

hdfs写入流程

图片来自网络

这张图就能很清晰的解释了ack机制,粗略理解可以这样理解:每个packet先写入dataQueue队列,当packet发往datanode的同时会写入一份到ackQueue作为备份,同时从dataQueue中移除,如果3个副本都写入成功了,就将ackQueue的packet移除,如果packet写入失败,那就将这个packet写回dataQueue的尾部,同时从ackQueue移除,这样就保证了数据的写入不丢失了,很多文字都写入的比较粗略,我这边整合了很多资料加上自己结合源码重新梳理了一下整个写流程,应该算是比较全面的了。

三、容错处理

        假设当前构建的pipeline是DataNode1、DataNode2和DataNode3。当数据传输过程中,DataNode2中断无法响应,则当前pipeline中断,需要重建。

       1、先将ackqueue中的所有package取出放回到dataqueue末尾。ClientNode调用NameNode的 updateBlockForPipeline 方法,为当前block生成新的版本,如ts1(本质是时间戳),然后将故障DataNode2从pipeline中删除。

       2、FSDataOutputStream调用NameNode的 getAdditionalDataNode 方法,由NameNode分配新的DataNode,假设是DataNode4。FSDataOutputStream把DataNode1、DataNode3和DataNode4建立新的pipeline,DataNode1和DataNode3上的block版本设置为ts1,通知DataNode1或DataNode3将block拷贝到DataNode4。

        3、新的pipeline创建好后,FSDataOutputStream调用NameNode的 updataPipeline 方法更新NameNode元数据。

       4、之后,按照正常的写入流程完成数据输出。后续,当DataNode2从故障中恢复。DataNode2向NameNode报送所有block信息,NameNode发现block为旧版本(非ts1),则通过DataNode2的心跳返回通知DataNode2将此旧版本的block删除。

四、读写过程,数据完整性如何保持?


       通过校验和。因为每个chunk中都有一个校验位,一个个chunk构成packet,一个个packet最终形成block,故可在block上求校验和。

       HDFS 的client端即实现了对 HDFS 文件内容的校验和 (checksum) 检查。当客户端创建一个新的HDFS文件时候,分块后会计算这个文件每个数据块的校验和,此校验和会以一个隐藏文件形式保存在同一个 HDFS 命名空间下。当client端从HDFS中读取文件内容后,它会检查分块时候计算出的校验和(隐藏文件里)和读取到的文件块中校验和是否匹配,如果不匹配,客户端可以选择从其他 Datanode 获取该数据块的副本。

HDFS中文件块目录结构具体格式如下:

${dfs.datanode.data.dir}/
├── current
│ ├── BP-526805057-127.0.0.1-1411980876842
│ │ └── current
│ │ ├── VERSION
│ │ ├── finalized
│ │ │ ├── blk_1073741825
│ │ │ ├── blk_1073741825_1001.meta
│ │ │ ├── blk_1073741826
│ │ │ └── blk_1073741826_1002.meta
│ │ └── rbw
│ └── VERSION
└── in_use.lock

in_use.lock表示DataNode正在对文件夹进行操作
rbw是“replica being written”的意思,该目录用于存储用户当前正在写入的数据。
Block元数据文件(*.meta)由一个包含版本、类型信息的头文件和一系列校验值组成。校验和也正是存在其中。

五、机架感知(副本节点的选择)

hdfs写入流程

以经典的 3 副本为例(黄色方框表示客户端,绿色表示要写的 block),Hadoop 早期版本采用左边的放置策略,后期版本采用右边的放置策略放置副本。

图左:

  • 副本 1:同机架的不同节点。

  • 副本 2:同机架的另一个节点。

  • 副本 3:不同机架的另一个节点。

  • 如果还有其他副本:随机。

图右:

  • 副本 1:同 Client 的节点。

  • 副本 2:不同机架的节点。

  • 副本 3:同第 2 副本相同机架的不同节点。

  • 如果还有其他副本:随机。

两种策略的故障域都为机架,新版相对于旧版本,当本客户端再次读取新写的数据时,直接从本地读取,这样延迟最小,读取速度最快。

六、DFSOutputStream内部原理

hdfs写入流程

6.1、创建Packet

  Client写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个Chunk大小(512B)时,便会创建一个Packet对象,然后向该Packet对象中写Chunk的Checksum校验和数据,以及实际数据块Chunk Data,校验和数据是基于实际数据块计算得到的。每次满足一个Chunk大小时,都会向Packet中写上述数据内容,直到达到一个Packet对象大小(64K),就会将该Packet对象放入到dataQueue队列中,等待DataStreamer线程取出并发送到DataNode节点。

6.2、发送Packet

   DataStreamer线程从dataQueue队列中取出Packet对象,放到ackQueue队列中,然后向DataNode节点发送这个Packet对象所对应的数据

6.3、接收ack

  发送一个Packet数据包以后,会有一个用来接收ack的ResponseProcessor线程,如果收到成功的ack,则表示一个Packet发送成功,ResponseProcessor线程会将ackQueue队列中对应的Packet删除,在发送过程中,如果发生错误,所有未完成的Packet都会从ackQueue队列中移除掉,然后重新创建一个新的Pipeline,排除掉出错的那些DataNode节点,接着DataStreamer线程继续从dataQueue队列中发送Packet。

纯属于个人通过搜索资料整理备忘,如有侵权,请联系删除!

参考资料:

https://www.cnblogs.com/feiyumo/p/12541296.html

https://baijiahao.baidu.com/s?id=1658688998882930549&wfr=spider&for=pc

https://www.infoq.cn/article/rudfbprmb5vwfpydus5r

上一篇:HDFS写数据流程


下一篇:Hadoop:HA模式配置与实现