HDFS datanode源码分析

datanode的介绍

一个典型的HDFS系统包括一个NameNode和多个DataNode。DataNode是hdfs文件系统中真正存储数据的节点。

每个DataNode周期性和唯一的NameNode通信,还时不时和hdfs客户端代码以及其他datanode通信。

 

datanode维护一个重要的表:

  块=>字节流

这些存储在本地磁盘,DataNode在启动时,还有启动后周期性报告给NameNode,这个表的内容。

DataNodes周期性请求NameNode询问命令操作,NameNode不能直接连接DataNode,NameNode在DataNode调用时,简单返回值。

DataNodes还维护一个开放的socket服务器,让客户端代码和其他DataNode通过它可以读写数据,这个服务器的host/port会汇报给NameNode。

 

datanode启动流程

在命令行启动datanode的方法是:bin/hadoop datanode

查看bin/hadoop脚本,可以看到最后执行的java类是:org.apache.hadoop.hdfs.server.datanode.DataNode

DataNode的骨架成员如下:

HDFS datanode源码分析
HDFS datanode源码分析
public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,Runnable, DataNodeMXBean {
  public DatanodeProtocol namenode = null;//与NameNode通信的ipc客户端类
  public FSDatasetInterface data = null;//管理一系列的数据块,每个块在本地磁盘上都有唯一的名字和扩展名。所有和数据块相关的操作,都在FSDataset相关的类中进行处理。
  public DatanodeRegistration dnRegistration = null;//DataNode向NameNode的注册信息,包含名字(datanode机器名:dfs.datanode.address端口),info的http端口,ipc的端口等

  volatile boolean shouldRun = true;//DataNode循环运行标志,为true就一直运行
  private LinkedList<Block> receivedBlockList = new LinkedList<Block>();//已经接收的数据块,定期通知namenode接收完毕时,会移除
  private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();//存放正在从本地块恢复到其他DataNode的数据块,恢复完毕后移除,在其他DataNode的数据块副本损坏或丢失时会使用
  private LinkedList<String> delHints = new LinkedList<String>(); //需要删除的块,一般是被替换时才会被删除,也是在定期通知namenode后,会移除

  Daemon dataXceiverServer = null;//用于读写数据的服务器,接收客户端和其他DataNode的请求,它不用于内部hadoop ipc机制,端口是dfs.datanode.address
  public Server ipcServer; //内部datanode调用的ipc服务器,用于客户端,端口是dfs.datanode.ipc.address
  
  long blockReportInterval;//数据块报告周期,默认是60*60秒,即一个小时
  long lastBlockReport = 0;//记录最近的数据块报告时间,与blockReportInterval联合使用


  long lastHeartbeat = 0;//记录最近和namenode的心跳时间
  long heartBeatInterval;//和namenode的心跳周期,默认是3s
  private DataStorage storage = null;//DataStorage提供了format方法,用于创建DataNode上的Storage,对DataNode的升级/回滚/提交过程,就是对DataStorage的doUpgrade/doRollback/doFinalize分析得到的。同时,利用StorageDirectory,DataStorage管理存储系统的状态。
  private HttpServer infoServer = null;//查看DataNode状态信息的http服务器,端口是dfs.datanode.http.address
  
  public DataBlockScanner blockScanner = null;//检测它所管理的所有Block数据块的一致性,因此,对已DataNode节点上的每一个Block,它都会每隔scanPeriod ms(默认三个星期)利用Block对应的校验和文件来检测该Block一次,看看这个Block的数据是否已经损坏。
  public Daemon blockScannerThread = null;
  
}
HDFS datanode源码分析
HDFS datanode源码分析

 

DataNode的初始化和启动:

HDFS datanode源码分析
HDFS datanode源码分析
public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,Runnable, DataNodeMXBean {
  //main方法,DataNode的入口点
  public static void main(String args[]) {
    secureMain(args, null);
  }
  
  //主线程阻塞,让DataNode的任务循环执行
  public static void secureMain(String [] args, SecureResources resources) {
    try {
      ...
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null)
        datanode.join();
    }
    ...
  }
  
  public static DataNode createDataNode(String args[],Configuration conf, SecureResources resources) throws IOException {
    DataNode dn = instantiateDataNode(args, conf, resources);
    runDatanodeDaemon(dn);//DataNode类作为一个Thread运行
    return dn;
  }
  
  public static DataNode instantiateDataNode(String args[],Configuration conf, SecureResources resources) throws IOException {
    ...
    String[] dataDirs = conf.getStrings(DATA_DIR_KEY);//获取DataNode管理的本地目录集合
    return makeInstance(dataDirs, conf, resources);
  }
  
  //检查本地目录集合的合法性
  public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException {
    ...
    ArrayList<File> dirs = new ArrayList<File>();
    FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION));
    for (String dir : dataDirs) {
      ...
        DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
        dirs.add(new File(dir));
      ...
    }
    if (dirs.size() > 0) 
      return new DataNode(conf, dirs, resources);
    return null;
  }
  
  //实例化DataNode
  DataNode(final Configuration conf,final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
    super(conf);
    ...
    try {
      startDataNode(conf, dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }   
  }
  
   void startDataNode(Configuration conf, AbstractList<File> dataDirs, SecureResources resources) throws IOException {
    
    InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);

    InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);//获取DataNode的数据块流的读写的端口
    int tmpPort = socAddr.getPort();
    storage = new DataStorage();//管理数据目录的类,完成格式化,升级,回滚等功能
    // construct registration
    this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);

    //与namenode通信的客户端类
    this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr, conf);
    //从NameNode获取版本和id信息
    NamespaceInfo nsInfo = handshake();

    if (simulatedFSDataset) {
        ...
    } else { // real storage
      // read storage info, lock data dirs and transition fs state if necessary
      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
      // adjust
      this.dnRegistration.setStorageInfo(storage);
      // initialize data node internal structure
      this.data = new FSDataset(storage, conf);//一切数据块读写的实际操作类
    }
      
    ...
    this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));//初始化数据块的流读写服务器
    ...
    //初始化数据块报告周期,默认是一个小时
    this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
    ...
    //初始化与namenode心跳周期,默认是3秒
    this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
    ...
    if ( reason == null ) {
      blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);//初始化数据块一致性检测类
    } 
    ...

    //DataNode的状态信息查询的http服务器地址
    InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
    ...
    //初始化DataNode的状态信息查询的http服务器
    this.infoServer = (secureResources == null) 
       ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, 
           conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))
       : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
           conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN),
           secureResources.getListener());
    ...
    //添加infoServer一些Servlet的映射url和类
    ...
    this.infoServer.start();
    ...
    //初始化内部hadoop ipc服务器
    InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
        conf.get("dfs.datanode.ipc.address"));
    ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), 
        conf.getInt("dfs.datanode.handler.count", 3), false, conf,
        blockTokenSecretManager);
    dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
    ...
  }
HDFS datanode源码分析
HDFS datanode源码分析

 

DataNode的服务:

HDFS datanode源码分析
HDFS datanode源码分析
//运行DataNode的后台线程
  public static void runDatanodeDaemon(DataNode dn) throws IOException {
    if (dn != null) {
      //register datanode
      dn.register();
      dn.dataNodeThread = new Thread(dn, dnThreadName);
      dn.dataNodeThread.setDaemon(true); 
      dn.dataNodeThread.start();
    }
  }
  //启动数据块的流读写服务器,内部hadoop ipc服务器
  public void run() {
    ...
    dataXceiverServer.start();
    ipcServer.start();
        
    while (shouldRun) {
      try {
        startDistributedUpgradeIfNeeded();//检测是否需要升级hadoop文件系统
        offerService();//DataNode提供服务,定时发送心跳给NameNode,响应NameNode返回的命令并执行
      } 
      ...
    }
  }
  
  //DataNode提供服务,定时发送心跳给NameNode,响应NameNode返回的命令并执行,通知namenode接收完毕的数据块和删除的数据块,定时上报数据块
  public void offerService() throws Exception {
    ...
    while (shouldRun) {
      try {
        long startTime = now();
        ...
        if (startTime - lastHeartbeat > heartBeatInterval) {
          lastHeartbeat = startTime;
          //定期发送心跳给NameNode
          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
                                                       data.getCapacity(),
                                                       data.getDfsUsed(),
                                                       data.getRemaining(),
                                                       xmitsInProgress.get(),
                                                       getXceiverCount());
          ...
          //响应namenode返回的命令做处理
          if (!processCommand(cmds))
            continue;
        }
        
        synchronized(receivedBlockList) {
          synchronized(delHints) {
              blockArray = receivedBlockList.toArray(new Block[numBlocks]);
              delHintArray = delHints.toArray(new String[numBlocks]);
            }
          }
        }
        if (blockArray != null) {
          //通知NameNode已经接收完毕的块,以及删除的块
          namenode.blockReceived(dnRegistration, blockArray, delHintArray);
          synchronized (receivedBlockList) {
            synchronized (delHints) {
              for(int i=0; i<blockArray.length; i++) {
                receivedBlockList.remove(blockArray[i]);//清空保存接收完毕的块
                delHints.remove(delHintArray[i]);//清空保存删除完毕的块
              }
            }
          }
        }
        
       if (startTime - lastBlockReport > blockReportInterval) {
          if (data.isAsyncBlockReportReady()) {
            // Create block report
            ...
            Block[] bReport = data.retrieveAsyncBlockReport();
            ...
            //向NameNode上报数据块信息
            DatanodeCommand cmd = namenode.blockReport(dnRegistration,
                    BlockListAsLongs.convertToArrayLongs(bReport));
            ...
            processCommand(cmd);
          } else {
            //请求异步准备好数据块上报信息
            data.requestAsyncBlockReport();
            ...
            }
          }
        }
        
    } // while (shouldRun)
  } // offerService
}
HDFS datanode源码分析
HDFS datanode源码分析

以上就是DataNode的启动流程和服务流程,都以作适当删减,留下主干,加上注释。

 

DataNode的相关重要类

FSDataset:所有和数据块相关的操作,都在FSDataset相关的类。详细分析参考 http://caibinbupt.iteye.com/blog/284365

DataXceiverServer:处理数据块的流读写的的服务器,处理逻辑由DataXceiver完成。详细分析参考 http://caibinbupt.iteye.com/blog/284979

DataXceiver:处理数据块的流读写的线程。详细分析参考 http://caibinbupt.iteye.com/blog/284979

                  还有处理非读写的非主流的流程。详细分析参考 http://caibinbupt.iteye.com/blog/286533

BlockReceiver:完成数据块的流写操作。详细分析参考 http://caibinbupt.iteye.com/blog/286259

BlockSender:完成数据块的流读操作。

DataBlockScanner:用于定时对数据块文件进行校验。详细分析参考http://caibinbupt.iteye.com/blog/286650

HDFS datanode源码分析

上一篇:Unity3D Physics.Raycast问题


下一篇:algorithm,ds,leftist heap