hadoop2.2原理:分析HDFS的文件读写

File Read

程序举例:

 public class FileRead {

     public static void main(Sting[] args) throws Exception {
       Configuration conf = new Configuration();
       FileSystem fs = FileSystem.get(conf);
       InputStream in = new InputStream();
       in = fs.open(new Path(args[0]));
       IOUtils.copyBytes(in, System.out, 4096, false);
       IOUtils.closeStream(in);
     }
 }  

过程分析:

(1)open

当client读取文件时,在FileSystem ojbect上调用open()方法,而FileSystem是HDFS的一个instance;

从上述程序中可见:

line5 得到FileSystem的instance

line7 调用FileSystem上的open()方法

(2)get block location

随后,HDFS通过RPC(Remote Procedure Call)来呼叫namenode, 来获得the locations of the blocks for the first few blocks, 对于每一个block, namenode会返回有此block的datanode 的 address, 而且datanode会根据Network Topology被重新排序;

获取locations以后,DFS会返回一个FSDataInputStream给client来读取数据, FSDataInputStream会依次wrap一个DFSInputStream来管理datanode和namenode的I/O,DFSInputStream上同时也存储了first few datanode 的address;

(3)read

client 在FSDataInputStream上call read(), 则DFSInputStream会连接存储第一个block的最近的datanode, 之后不停地call read()方法从datanode读取数据到client, 当到达block的末尾,DFSInputStream会关闭与此datanode的连接, 然后找到存储下一个block的datanode, 依次往复...

(4) DFSInputStream

DFSInputStream按顺序读入每一个packet的最近的一个block, 每读一个block都要重新和一个datanode建立连接;

DFSInputStream同时会和namenode保持连接,来重新获取下一个packet的blocks所在的datanode的locations

(5)FSDataInputStream

FSDataInputStream 是client和datanode连接的中介, client call read() methods 都通过FSDataInputStream来调用DFSInputStream

(6)容错

在读取数据的过程中遇到的错误主要有两类:

1. DFSInputStream和datanode 的communication出现错误, 此时DFSInputStream会尝试连接保存此packet的下一个block所在的datanode中最近的一个, 同时会记录此datanode, 防止读取下一个block是再次从该datanode上读取;

2. DFSInputStream checksum data from datanode时,发现损坏的数据块, 则它会在DFSInputStream尝试从另一个datanode读取此packet的下一个block副本之前报告给namenode;

File Write

程序举例:

 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;

 public class CreateDir {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String dst = args[0];
     FileSystem fs = FileSystem.get(conf);
     fs.create(new Path(dst));
   }
 }

过程分析:

(1)create

当client 写入文件时, 在FileSystem object上调用create()方法, FileSystem是HDFS的一个instance;

(2) create new file in filesystem's namespace

DFS 通过Remote Procedure Call 来call namenode, namenode会在filesystem的namespace产生一个新的文件, 新建文件之前,nanemode会做一些列检查,包括client是否有create file的permission, 要create的文件是否已经存在, 若检查没有通过,则抛出IOException;

create file以后,DFS会返回一个FSDataOutputStream给client来写文件, FSDataInputStream会wrap一个DFSOutputStream来与namenode和datanode交流;

(3) client write data

client开始写入数据到文件时,DFSOutputStream会将待写入的数据split into 很多packets, 这些packet会被写入一个内部队列data queue, DFSOutputStream维护此data queue;

这个data queue将被DataStreamer所用,DataStreamer主要负责向namenode发出申请,来为新的packet的block副本分配合适的datanode, namenode会挑选出合适的datanodes来存储这些data blocks;

存储这个packet的blocks的datanode会组成一个pipeline, 假设每个packet的block的replication level是3, 则此pipeline由3个datanode组成。

DataStreamer将此packet导入pipeline的第一个datanode, 该datanode存储此packet之后forward it to 第二个datanode, 同样地,第二个datanode存储此packet, forward to 第三个datanode;

(4) 容错

从(3)中我们知道DFSOutputStream维护着一个data queue, 此外,

DFSOutputStream还维护一个ack queue(acknowledged), 当一个packet已经被所有在pipeline中的datanode acknowledged, 则,此packet将会从ack queue中移走;

如果写数据时,一个datanode 写入失败,则会发生以下动作:

首先, pipeline会关闭,所有在ack queue中的packets会被添加到data queue前面,以保证下游的datanode不会丢失任何packets, 当前已写入datanode的数据块会被标识, 而写入坏的datnode中的部分数据会在此datanode recover以后被删除;

failed的datanode将从pipeline中移走, namenode 会notice到这些,会重新分配一个datanode来组成新的pipeline; 下一个packet的block不会受到影响;

当写入一个block时若大量的datanode failed,只要满足dfs.replication.min(default is 1), 则此写入就不会失败,block会被复制并同步到cluster上的datanode中,知道达到dfs.replication所设的数目(默认是3)

(5)数据写入的收尾阶段

当client完成数据写入是,client会调用在FSDataOutputStream 上的close()。 此动作会在给namenode发送file完成写入的信号之前flushs所有剩余的packets到datanode pipeline并等待acknowledge;

因为DataStreamer之前曾为所有的packets向namenode申请过block locations, 故namenode已经知道此文件由哪些blocks组成。

上一篇:单源最短路径算法:迪杰斯特拉 (Dijkstra) 算法(二)


下一篇:JS 响应式编程