Hadoop之HDFS原理及文件上传下载源码分析(上)

HDFS原理

  首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来。

  楼主的环境:

  •   操作系统:Ubuntu 15.10
  •   hadoop版本:2.7.3
  •   HA:否(随便搭了个伪分布式)

文件上传

下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程:

Hadoop之HDFS原理及文件上传下载源码分析(上)

  首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯。

  NameNode与各DataNode使用心跳机制来获取DataNode信息。NameNode收到Client请求后,获取DataNode信息,并将可存储文件的节点信息返回给Client。

  Client收到NameNode返回的信息,与对应的DataNode节点取得联系,并向该节点写文件。

  文件写入到DataNode后,以流水线的方式复制到其他DataNode(当然,这里面也有DataNode向NameNode申请block,这里不详细介绍),至于复制多少份,与所配置的hdfs-default.xml中的dfs.replication相关。

  元数据存储

  先明确几个概念:

  fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
  edits:操作日志文件。
  fstime:保存最近一次checkpoint的时间

  checkpoint可在hdfs-default.xml中具体配置,默认为3600秒:

 <property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>

  fsimage和edits文件在namenode目录可以看到:

Hadoop之HDFS原理及文件上传下载源码分析(上)

NameNode中的元数据信息:

  Hadoop之HDFS原理及文件上传下载源码分析(上)

  test.log文件上传后,Namenode始终在内存中保存metedata,用于处理“读请求”。metedata主要存储了文件名称(FileName),副本数量(replicas),分多少block存储(block-ids),分别存储在哪个节点上(id2host)等。

  到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回
  hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。此时Secondary namenode就派上用场了,合并fsimage和edits文件并更新NameNode的metedata。
  Secondary namenode工作流程:

  1. secondary通知namenode切换edits文件
  2. secondary通过http请求从namenode获得fsimage和edits文件
  3. secondary将fsimage载入内存,然后开始合并edits
  4. secondary将新的fsimage发回给namenode
  5. namenode用新的fsimage替换旧的fsimage

  通过一张图可以表示为:

Hadoop之HDFS原理及文件上传下载源码分析(上)

文件下载

  文件下载相对来说就简单一些了,如图所示,Client要从DataNode上,读取test.log文件。而test.log由block1和block2组成。

  Hadoop之HDFS原理及文件上传下载源码分析(上)

  文件下载的主要流程为:

  • client向namenode发送请求。
  • namenode查看Metadata信息,返回test.log的block的位置。

    Block1: h0,h1,h3
    Block2: h0,h2,h4

  • 开始从h0节点下载block1,block2。

源码分析

  我们先简单使用hadoop提供的API来实现文件的上传下载(文件删除、改名等操作比较简单,这里不演示):

  

 package cn.jon.hadoop.hdfs;

 import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test; public class HDFSDemo {
FileSystem fs = null;
@Before
public void init(){
try {
//初始化文件系统
fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
public static void main(String[] args) { }
@Test
/**
* 文件上传
*/
public void testFileUpload(){
try {
OutputStream os = fs.create(new Path("/test.log"));
FileInputStream fis = new FileInputStream("I://test.log");
IOUtils.copyBytes(fis, os, 2048,true);
//可以使用hadoop提供的简单方式
fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log"));
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}
@Test
/**
* 文件下载
*/
public void testFileDownload(){
try {
InputStream is = fs.open(new Path("/test.log"));
FileOutputStream fos = new FileOutputStream("E://test.log");
IOUtils.copyBytes(is, fos, 2048);
//可以使用hadoop提供的简单方式
fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log"));
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
} }

  显而易见,只要是对hdfs上的文件进行操作,必须对FileSystem进行初始化,我们先来分析FileSystem的初始化:

  

  public static FileSystem get(URI uri, Configuration conf) throws IOException {
return CACHE.get(uri, conf);//部分方法我只截取了部分代码,这里进入get()方法
}
    FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);//调用getInternal()
}
 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
//使用单例模式创建FileSystem,这是由于FS的初始化需要大量的时间,使用单例保证只是第一次加载慢一些,返回FileSystem的子类实现DistributedFileSystem
FileSystem fs;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
} fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
} // now insert the new file system into the map
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
}
fs.key = key;
map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
toAutoClose.add(key);
}
return fs;
}
}
 public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf); String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
homeDirPrefix = conf.get(
DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT); this.dfs = new DFSClient(uri, conf, statistics);//实例化DFSClient,并将它作为DistributedFileSystem的引用,下面我们跟进去
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
//该构造太长,楼主只截取了重要部分给大家展示,有感兴趣的同学可以亲手进源码瞧瞧
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
//这里声明了NameNode的代理对象,跟我们前面讨论的rpc就息息相关了
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();//获取NameNode代理对象引用并自己持有,this.namenode类型为ClientProtocol,它是一个接口,我们看下这个接口
}
}
 public interface ClientProtocol{
public static final long versionID = 69L;
//还有很多对NameNode操作的方法申明,包括对文件上传,下载,删除等
//楼主特意把versionID贴出来了,这就跟我们写的RPCDemo中的MyBizable接口完全类似,所以说Client一旦拿到该接口实现类的代理对象(NameNodeRpcServer),Client就可以实现与NameNode的RPC通信,我们继续跟进
}
  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
// 如果不是HA的创建方式,楼主环境是伪分布式,所以走这里,我们跟进去
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else {
// 如果有HA的创建方式
Conf config = new Conf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
config.maxRetryAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
}
}

  最终返回的为ClientProtocol接口的子类代理对象,而NameNodeRpcServer类实现了ClientProtocol接口,因此返回的为NameNode的代理对象,当客户端拿到了NameNode的代理对象后,即与NameNode建立了RPC通信:

 private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是不是感觉越来越像我们前面说到的RPC final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(//加载默认策虐
conf,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
SafeModeException.class); final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
//看到versionId了吗?这下明白了rpc的使用中目标接口必须要有这个字段了吧
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
//看到没?这里使用 RPC.getProtocolProxy()来创建ClientNamenodeProtocolPB对象,调试时可以清楚的看见,该对象引用的是一个代理对象,值为$Proxy12,由JDK的动态代理来实现。
//前面我们写RPCDemo程序时,用的是RPC.getProxy(),但是各位大家可以去看RPC源码,RPC.getProtocolProxy()最终还是调用的getProxy()
if (withRetries) {
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
ClientProtocol translatorProxy =
new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol) RetryProxy.create(//这里再次使用代理模式对代理对象进行包装,也可以理解为装饰者模式
ClientProtocol.class,
new DefaultFailoverProxyProvider<ClientProtocol>(
ClientProtocol.class, translatorProxy),
methodNameToPolicyMap,
defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}

  整个FileSystem的初始化用时序图表示为:

Hadoop之HDFS原理及文件上传下载源码分析(上)

  到此,FileSystem的初始化就基本完成。由于文章篇幅过大的问题,所以楼主把HDFS原理及源码分析拆分成了两部分,上半部分主要是HDFS原理与FileSystem的初始化介绍,那在下半部分将会具体介绍HDFS文件上传、下载的源码解析。

  另外,文章用到的一些示例代码,将会在下半部分发布后,楼主一起上传到GitHub。

上一篇:Dirichlet's Theorem on Arithmetic Progression


下一篇:[LeetCode] Trapping Rain Water II 题解