Hadoop HDFS (3) JAVA訪问HDFS之二 文件分布式读写策略

先把上节未完毕的部分补全,再剖析一下HDFS读写文件的内部原理

列举文件

FileSystem(org.apache.hadoop.fs.FileSystem)的listStatus()方法能够列出一个文件夹下的内容。
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException;
public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException;
public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException;
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws FileNotFoundException, IOException;
这一组方法,都接收Path參数,假设Path是一个文件,返回值是一个数组,数组里仅仅有一个元素,是这个Path代表的文件的FileStatus对象;假设Path是一个文件夹,返回值数组是该文件夹下的全部文件和文件夹的FileStatus组成的数组,有可能是一个0长数组;假设參数是Path[],则返回值相当于多次调用单Path然后把返回值整合到一个数组里;假设參数中包括PathFilter,则PathFilter会对返回的文件或文件夹进行过滤,返回满足条件的文件或文件夹,条件由开发人员自己定义,使用方法与java.io.FileFilter相似。
以下这个程序接收一组paths,然后列出当中的FileStatus
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class ListStatus {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf); Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
} FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
上传程序,然后运行:
$hadoop ListStatus / /user /user/norris
则列出/下,/user/下,/user/norris/下的全部文件和文件夹。
在Hadoop下运行程序的方法见上一篇博客(http://blog.csdn.net/norriszhang/article/details/39648857

File patterns 用通配符列出文件和文件夹

FileSystem的globStatus方法就是利用通配符来列出文件和文件夹的。glob就是通配的意思。
FileSystem支持的通配符有:
*:匹配0个或多个字符
?:匹配1个字符
[ab]:匹配方括号里列出的字符
[^ab]:匹配方括号里没有列出的字符
[a-b]:匹配方括号里列出的字符范围
[^a-b]:匹配方括号里列出的字符范围以外的字符
{a,b}:或者匹配a或者匹配b
\c:转义,假设c是一个元字符,就代表这个字符本身,比方\[,就表示字符[
public FileStatus[] globStatus(Path pathPattern) throws IOException;
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException;
尽管pathPattern非常强大,可是也有些情况不能满足,比方就是要排除某个特定文件,这时就须要使用PathFilter了。
package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
}
这是PathFilter这个接口的定义,使用时仅仅要实现accept方法,返回是否选中该Path即可了。这个accept方法接收的參数是一个Path,也就是说,实用的信息差点儿仅仅能拿到路径和文件名称,像改动时间啊、权限啊、全部者呀、大小啊什么的,都拿不到,没有FileStatus那样强大,所以,假设我们希望按改动时间来选文件时,就要在文件的命名时带上时间戳了。(当然,通过FileSystem也能再次获得那些信息,可是。。。那也太不值当的了吧?我也不确定会有多大消耗)

删除文件

FileSystem的delete方法删除一个文件或文件夹(永久删除)。
public boolean delete(Path f, boolean recursive) throws IOException;
删除f,假设f是一个文件或空文件夹,则无论recursive传什么,都删除,假设f是一个非空文件夹,则recursive为true时文件夹下内容所有删除,假设recursive为false,不删除,并抛出IOException。

以下再深入剖析HDFS读写文件时的数据流向过程

读文件剖析

第一步:client通过调用FileSystem.open()方法打开一个文件,对于HDFS来讲,事实上是调用的DistributedFileSystem实例的open方法;
第二步:DistributedFileSystem通过远程方法调用(RPC)訪问namenode,获取该文件的前几个blocks所在的位置信息;针对每一个block,namenode都会返回有该block数据信息的全部datanodes节点,比方配置的dfs.replication为3,就会每一个block返回3个datanodes节点信息,这些节点是按距离client的远近排序的,假设发起读文件的client就在包括该block的datanode上,那该datanode就排第一位(这样的情况在做Map任务时常见),client就会从本机读取数据。关于怎样推断离client的距离远近的问题,一会儿的网络拓扑理论上会讲到。
DistributedFileSystem的open方法返回一个FSDataInputStream,FSDataInputStream里包装着一个DFSInputStream,DFSInputStream真正管理datanodes和namenode的I/O;
第三步:client调用FSDataInputStream.read()方法,FSDataInputStream里已经缓存了该文件前几个block所在的datanode的地址,于是从第一个block的第一个地址(也就是近期的datanode)開始连接读取;
第四步:重复调用read()方法,数据不断地从datanode流向client;
第五步:当一个block的数据读完了,DFSInputStream会关闭当前datanode的连接,打开下一个block所在的最优datanode的连接继续读取;这些对client是透明的,在client看来,就是在读一个连续的流;
第六步:这样一个block一个block读下去,当须要很多其它block的存储信息时,DFSInputStream会再次调用namenode,获取下一批block的存储位置信息,直到client停止读取,调用FSDataInputStream.close()方法,整个读取过程结束。

在读取过程中,假设DFSInputStream在与Datanode通信时发生了错误,它会试着向下一个近期的datanode节点获取当前block数据,DFSInputStream也会记录下错误发生的datanode节点,以便在以后block数据的读取时,不再去这些节点上尝试。
DFSInputStream在读取到datanode上的block数据后也会做checksum校验,假设checksum失败,它会先向namenode报告这台datanode上的数据有问题,然后再去尝试一下个存有当前block的datanode。
在这一整套的设计上,最重要的一点是:client在namenode的指引下,直接向最优datanode读取数据,这种设计让HDFS支持大规模的并发,由于client读取数据的流量分布在集群的每一个节点上,namenode仅仅是通过内存提供位置信息而不提供数据,假设client都通过namenode获得数据,那client的数量就大大受限制了。

Hadoop怎样决定哪个datanode离client近期

在网络上什么叫“近”?在大数据流动时,带宽是最稀缺的资源,因此,用两个节点之前的带宽来定义它们之间的距离非常合理。
在实践中,那么多节点,在每两个节点之间都測量带宽是不现实的,Hadoop採取了折中的方式,它把网络结构想象成一棵树,两个结点之间的距离就是两个节点分别向上找父、祖父、祖宗。。。直到两个节点有一个共同祖宗时,它们俩走的步数之和。没有人规定树必须有多少级,但通常的做法是分成“数据中心”、“机架”、“节点”三级,越排在前面的,之间通信带宽越小,比方两个数据中心之间通信要比同数据中心两个机架之间通信要慢,两个机架之间通信要比同机架的两个节点通信慢。所以,依照由快到慢各自是:
- 本机
- 同机架的两个节点
- 同数据中心不同机架的两个节点
- 不同数据中心的两个节点
假如用d表示数据中心,r表示机架,n表示节点,那/d1/r1/n1就表示1数据中心1机架上的1号节点。
- distance(/d1/r1/n1, /d1/r1/n1) = 0 //同一台机器
- distance(/d1/r1/n1, /d1/r1/n2)
= 2 //同一个机架上的两台机器,它们各自到共同父结点r1的步数都是1,因此距离是2
- distance(/d1/r1/n1, /d1/r2/n3)
= 4 //同数据中心的两个机架
- distance(/d1/r1/n1, /d2/r3/n4)
= 6 //不同的数据中心

最后,Hadoop是无法知道你的网络拓扑结构的,所以你得通过配置告诉它。默认情况下,它觉得全部的节点都是同一个机架上的节点,也就是随意两台之间的距离都是同样的。在小规模的集群中,这个默认配置就够用了,可是大的集群须要很多其它的配置,以后讲到集群配置时再说。

写文件剖析

文件是怎么被写进HDFS的呢?以下的介绍可能过于仔细了,可是这样才干有助于理解HDFS的数据一致性模型。
我们来讨论一下创建新文件,向里写数据,然后关闭文件的一个过程:
第一步:client调用DistributedFileSystem.create()方法创建一个文件;
第二步:DistributedFileSystem向namenode发起远程方法调用(RPC),创建一个文件,可是namenode没有把它关联到不论什么block上去;namenode在这一步做了非常多检查工作,保证该文件当前不存在,client有创建该文件的权限等。假设这些检查都通过了,namenode创建一条新文件记录,否则,创建失败,client返回IOException。DistributedFileSystem返回一个FSDataOutputStream,像读文件时一样,这个FSDataOutputStream里包装着一个DFSOutputStream,由它来实际处理与datanodes和namenode的通信。
第三步:client向DFSOutputStream里写数据,DFSOutputStream把数据分成包,丢进一个称为data queue的队列中,DataStreamer负责向namenode申请新的block,新的block被分配在了n(默认3)个节点上,这3个节点就形成一个管道。
第四步:DataStreamer把data queue里的包拿出来通过管道输送给第一个节点,第一个节点再通过管道输送给第二个节点,第二个再输送给第三个。
第五步:DFSOutputStream同一时候还在内部维护一个通知队列,名叫ack queue,里面是发过的数据包,一个包仅仅有被全部管道上的datanodes通知收到了,才会被移除。假设随意一个datanode接收失败了,首先,管道关闭,然后把ack queue里的包都放回到data
queue的头部,以便让失败节点下游节点不会丢失这些数据。当前已经成功接收数据了的节点将会经与namenode协商后分配一个新的标识,以便当坏节点以后恢复回来时能够把上面的不完整数据删除。然后打开管道把坏节点移出,数据会继续向其他好节点输送,直到管道上的节点都完毕了,这时事实上是少复制了一个节点,向namenode报告一下说如今这个block没有达到设定的副本数,然后就返回成功了,后期namenode会组织一个异步的任务,把副本数恢复到设定值。然后,接下来的数据包和数据块正常写入。以上操作,对client都是透明的,client不知道发生了这些事情,仅仅知道写文件成功了。
假设多个datanodes都失败了怎么办呢?hdfs-site.xml里有个配置dfs.replication.min,默认值是1,意思是仅仅要有1个datanode接收成功,就觉得数据写入成功了。client就会收到写入成功的返回。后期Hadoop会发起异步任务把副本数恢复到dfs.replication设置的值(默认3)。
第六步:当client完毕数据写入,调用流的close()方法,这个操作把data queue里的全部剩余的包都发给管道。
第七步:等全部包都收到了写成功的反馈,client通知namenode写文件完毕了。由于DataStream写文件前就先向namenode申请block的位置信息了,所以写文件完毕时,namenode早已知道每一个block都在哪了,它仅仅需等最小的副本数写成功,就能够返回成功了。

Namenode怎样选择一个block被写到哪几个节点上去?

Hadoop在这个算法上是做了权衡处理的。都写到同一个节点上,或者写在同一个机架的节点上,肯定是效率最高的,由于传输数据的带宽最大,但这就不是分布式冗余了,万一这个节点失败,或者这个机架掉电,这份数据就再也读不到了。当然任意写到三台机器上,最好分在不同的数据中心才最安全,可是那样又太损失效率了。即使是在同一个数据中心的节点上写,也有非常多种选择策略。从1.x版開始,这个策略就变成可插拔的了。
当前Hadoop默认的策略是:
第一份:假设client就执行在当前集群上,那第一个副本就存在当前节点上,假设client不执行在当前集群上,则随机选择第一个副本节点。当然这个随机是会考虑不要选已经有了非常多数据或当前正在处理非常大流量的datanode的;
第二份:选择与第一份不在同一个机架上的随机一个节点;
第三份:选择与第二份在同一个机架上的还有一个随机节点;
很多其它份:假设须要复制很多其它份,其它节点是随机选择的,仅仅是尽量分布在多个机架上,不让一个机架上有太多份副本。
*该书写作时Hadoop不支持跨数据中心部署,如今的版本号不知道是不是去掉了这个限制,假设是,那这个策略是不是也会考虑跨数据中心,临时还不清楚。
整体来看,这种策略平衡考虑了可靠性(数据分布在不同的机架上)、写带宽(仅仅有一次写须要跨机架)、读性能(读数据时有两上机架上的datanodes可选),数据分布在整个集群上。

数据一致性模型

对于文件系统来讲,所谓数据一致性模型,就是说一个写文件操作写进的数据,在什么时机能够被其他读文件的操作看到。HDFS在数据一致性方面做了平衡,因此可能不像本地文件系统那样,写进去的数据立即能够读到。
当一个文件被创建时,它是立即能够被看到的,可是当数据写进时,即使调用flush,读文件的操作也未必能看到,这个文件的长度可能还是0。(前一节讲文件写入时的progress回调时,我曾做了实验,一边往里写,非常次回调时睡1秒,然后还有一边不停看文件写进去多大了,结果发现一直是0,直到程序结束完毕了写入,才看到文件的真实大小,当时以为是没有flush,如今看来事实上是这样的特殊的数据一致性模型导致的。)
HDFS的数据一致性模型是以block为单位的,一个block被写完了,会看到一个block的数据,没写完一个block,就看不到这个block的数据。block 1写完了,其他读操作能看到这个block的内容,这时block 2正在写入,可是其他读操作却看不到,直到block
2完毕,開始写block 3,block 2的数据才干够被其他读操作看到。
HDFS提供了一个方法,FSDataOutputStrean.sync(),强制让当前已写入的数据对其他读操作可见。在1.x以后的版本号中,这个sync()方法被废弃了,改用hflush(),另外另一个hsync()方法,声明说是更强的保证数据一致性,但到写书时为止,hsync()方法没有被实现,仅仅是简单地调用了hflush()而已。
关闭文件时会非显示调用sync()方法,也就是被关闭了的文件,其所有数据都能够被其他读者看到了。

这一数据一致性模型对于应用程序是有影响的。应用程序的开发人员应该心里清楚,当写操作进行时假设读数据,或者当client或系统出现故障时,可能会有最多一个block的数据丢失。假设你的应用不能接受,那就要彩取适当的策略在适当的时候调用hflush()方法,可是频繁调用hflush会影响吞吐量,所以你要在程序健壮性和吞吐量双方面做出权衡,选择适当的调用hflush的频率。

用Flume和Sqoop导入数据

敲代码把数据放入HDFS,不如用已有的工具。由于如今已经有非常成熟的工具来完毕这件事,并且已经覆盖了大部分的需求。
Flume是Apache的大量数据移动的一个工具。当中一个典型的应用就是把Flume部署在web server的机器上,把web
server上的日志收集起来导入到HDFS。它同一时候也支持各种日志写入。
Sqoop也是Apache的工具,它用于把大量结构化数据批量导入HDFS,比方把关系型数据库里的数据导入到Hive里。Hive是执行在Hadoop上的数据仓库,后面章节讲到。
上一篇:WordPress 技巧:禁用 Emoji 功能


下一篇:oracle substr功能