引言
Hadoop提供的HDFS布式文件存储系统,提供了基于thrift的客户端访问支持,但是因为Thrift自身的访问特点,在高并发的访问情况下,thrift自身结构可能将会成为HDFS文件存储系统的一个性能瓶颈。我们先来看一下一不使用Thrfit方式访问HDFS文件系统的业务流程。
一、HDFS文件读取流程
流程说明:
- 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
- Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的DataNode地址;
- 客户端开发库Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据.
- 读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode;
- 当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
- 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。
二、HDFS文件写入流程
流程说明:
- 使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
- Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;
- 当 客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以数据队列"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表, 列表的大小根据在Namenode中对replication的设置而定。
- 开始以pipeline(管道)的形式将packet写入所 有的replicas中。开发库把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此 pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。
- 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。
- 如 果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除, 剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持 replicas设定的数量。
三、关键词
HDFSClient通过文件IO操作最终实现是通过直接访问DataNode进行。
四、Thrift的访问流程:猜测版
流程说明:
1.ThriftClient客户端将操作命令传给ThriftServer。
2.ThriftServer调用HDFSClient接口API实现HDFS读写操作,操作流程如二和三所示。
五、疑问
与DataNode发生数据交换的到底是ThriftServer还是ThriftClient,如果是ThriftServer,那么多个ThriftClient并行访问时,ThriftServer必将成为HDFS访问的性能瓶颈;如果是ThriftClient直接访问DataNode,那么理论依据何在呢?
六、示例程序
下面是一个基于Thrift实现的HDFS客户端程序,实现了文件的访问和创建和读取
// HdfsDemo.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>
#include <transport/TTransportUtils.h>
#include "ThriftHadoopFileSystem.h"
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0500
#endif
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
int _tmain(int argc, _TCHAR* argv[])
{
if (argc < 3)
{
std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl;
//return -1;
}
boost::shared_ptr<TTransport> socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast<int>(argv[2])));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
ThriftHadoopFileSystemClient client(protocol);
try
{
transport->open();
Pathname path;
//01_create directory
path.__set_pathname("/user/hadoop");
if(client.exists(path) == true)
{
printf("path is exists.\r\n");
}
else
{
printf("path is not exists.");
//return 0;
}
//02_put file
Pathname filepath;
filepath.__set_pathname("/user/hadoop/in/test1.txt");
/*
FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb");
if (localfile == NULL)
{
transport->close();
return 0;
}
ThriftHandle hdl;
client.create(hdl,filepath);
while (true)
{
char data[1024];
memset(data,0x00,sizeof(data));
size_t Num = fread(data,1,1024,localfile);
if (Num <= 0)
{
break;
}
client.write(hdl,data);
}
fclose(localfile);
client.close(hdl);
*/
//03_get file
/*
ThriftHandle hd2;
FileStatus stat1;
client.open(hd2,filepath);
client.stat(stat1,filepath);
int index = 0;
while(true)
{
string data;
if (stat1.length <= index)
{
break;
}
client.read(data,hd2,index,1024);
index += data.length();
printf("==%s\r\n",data.c_str());
}
client.close(hd2);
*/
//04_list files
std::vector<FileStatus> vFileStatus;
client.listStatus(vFileStatus,path);
for (int i=0;i<vFileStatus.size();i++)
{
printf("i=%d file=%s\r\n",i,vFileStatus[i].path.c_str());
}
transport->close();
} catch (const TException &tx) {
std::cerr << "ERROR: " << tx.what() << std::endl;
}
getchar();
return 0;
}
七、源码分析
1.文件创建:
/**
* Create a file and open it for writing, delete file if it exists
*/
public ThriftHandle createFile(Pathname path,
short mode,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("create: " + path +
" permission: " + mode +
" overwrite: " + overwrite +
" bufferSize: " + bufferSize +
" replication: " + replication +
" blockSize: " + blockSize);
FSDataOutputStream out = fs.create(new Path(path.pathname),
new FsPermission(mode),
overwrite,
bufferSize,
replication,
blockSize,
null); // progress
long id = insert(out);
ThriftHandle obj = new ThriftHandle(id);
HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
return obj;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
ThriftHandle的两端到底是谁呢?是ThriftClient和DataNode?还是ThriftServer与DataNode?
2.文件写入
public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("write: " + tout.id);
FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
byte[] tmp = data.getBytes("UTF-8");
out.write(tmp, 0, tmp.length);
HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
return true;
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
写入时依赖的还是ThriftHandle?
3.文件读取
/**
* read from a file
*/
public String read(ThriftHandle tout, long offset,
int length) throws ThriftIOException {
try {
now = now();
HadoopThriftHandler.LOG.debug("read: " + tout.id +
" offset: " + offset +
" length: " + length);
FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
if (in.getPos() != offset) {
in.seek(offset);
}
byte[] tmp = new byte[length];
int numbytes = in.read(offset, tmp, 0, length);
HadoopThriftHandler.LOG.debug("read done: " + tout.id);
return new String(tmp, 0, numbytes, "UTF-8");
} catch (IOException e) {
throw new ThriftIOException(e.getMessage());
}
}
八、遗留问题
ThriftHandle可以看做是Socket连接句柄,但是他的两端到底是谁呢?如果是ThriftClient代表的客户端则一切OK,那么我该如何证明呢?存疑待考!