Hadoop-2.7.0中HDFS NameNode HA实现之DFSZKFailoverController、ZKFailoverController(一)

一、简介

      DFSZKFailoverController是Hadoop-2.7.0中HDFS NameNode HA实现的中心组件,它负责整体的故障转移控制等。它是一个守护进程,通过main()方法启动,继承自ZKFailoverController。

二、实现流程

      1、启动

       通过main()方法启动,如下:

  /**
   * 进程启动的main()方法
   */
  public static void main(String args[])
      throws Exception {
	  
	//  解析参数
    if (DFSUtil.parseHelpArgument(args, 
        ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    
    GenericOptionsParser parser = new GenericOptionsParser(
        new HdfsConfiguration(), args);
    
    // 通过静态方法创建DFSZKFailoverController实例zkfc
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(
        parser.getConfiguration());
    int retCode = 0;
    try {
      // 调用zkfc的run()方法,执行主业务逻辑
      retCode = zkfc.run(parser.getRemainingArgs());
    } catch (Throwable t) {
      LOG.fatal("Got a fatal error, exiting now", t);
    }
    System.exit(retCode);
  }
      解析参数,然后通过静态方法构造一个zkfc实例,调用zkfc的run()方法,执行主业务逻辑。

      2、实例化

      实例化是在静态方法create中完成的,如下:

  /**
   * 对象实例化用的静态方法
   */
  public static DFSZKFailoverController create(Configuration conf) {
    
	// 获取本地NameNode配置信息  
	Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
    // 获取该NameNode的命名服务ID:NamenodeNameServiceId
	String nsId = DFSUtil.getNamenodeNameServiceId(conf);

	// 检测是否支持HA,不支持的话直接抛出异常
    if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
      throw new HadoopIllegalArgumentException(
          "HA is not enabled for this namenode.");
    }
    
    // 获取NameNode ID,并校验
    String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
    if (nnId == null) {
      String msg = "Could not get the namenode ID of this node. " +
          "You may run zkfc on the node other than namenode.";
      throw new HadoopIllegalArgumentException(msg);
    }
    
    // NameNode初始化通用keys
    NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
    DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);
    
    // 构造NNHAServiceTarget实例localTarget,NNHAServiceTarget继承自HAServiceTarget,代表一个客户端HA管理命令的目标。其实就是封装了网络连接的相关地址参数
    NNHAServiceTarget localTarget = new NNHAServiceTarget(
        localNNConf, nsId, nnId);
    
    // 利用本地配置、localTarget构造DFSZKFailoverController
    return new DFSZKFailoverController(localNNConf, localTarget);
  }
      主要就是构造DFSZKFailoverController对象,并在此之前,通过配置信息,获取一些前置参数,比如:

      1)NameNode的命名服务ID:NamenodeNameServiceId;

      2)NameNode ID;

      3)构造NNHAServiceTarget实例localTarget,NNHAServiceTarget继承自HAServiceTarget,代表一个客户端HA管理命令的目标。其实就是封装了网络连接的相关地址参数,包括socket地址等;
      上述这些在ZooKeeper上注册节点时,是需要通过protobuf序列化后写入的节点数据的。

      3、运行

      运行时通过父类run()、doRun()方法实现的,如下:

  private int doRun(String[] args)
      throws HadoopIllegalArgumentException, IOException, InterruptedException {
    try {
      // 初始化zookeeper,
      initZK();
    } catch (KeeperException ke) {
      LOG.fatal("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.");
      return ERR_CODE_NO_ZK;
    }
    if (args.length > 0) {
      if ("-formatZK".equals(args[0])) {
        boolean force = false;
        boolean interactive = true;
        for (int i = 1; i < args.length; i++) {
          if ("-force".equals(args[i])) {
            force = true;
          } else if ("-nonInteractive".equals(args[i])) {
            interactive = false;
          } else {
            badArg(args[i]);
          }
        }
        return formatZK(force, interactive);
      } else {
        badArg(args[0]);
      }
    }

    if (!elector.parentZNodeExists()) {
      LOG.fatal("Unable to start failover controller. "
          + "Parent znode does not exist.\n"
          + "Run with -formatZK flag to initialize ZooKeeper.");
      return ERR_CODE_NO_PARENT_ZNODE;
    }

    try {
      localTarget.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
          "You must configure a fencing method before using automatic " +
          "failover.", e);
      return ERR_CODE_NO_FENCER;
    }

    // 初始化rpc
    initRPC();
    
    // 初始化健康检查器
    initHM();
    
    // 启动rpc
    startRPC();
    
    // 主循环
    try {
      mainLoop();
    } finally {
      
      // 停止rpc服务
      rpcServer.stopAndJoin();
      
      // 选举器退出选举
      elector.quitElection(true);
      
      // 停止健康检查器
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }
      首先会做一些初始化操作,比如初始化zookeeper、初始化rpc、初始化健康监视器、启动rpc,然后进入主循环,当主循环退出时,再完成一些停止或关闭操作,比如:停止rpc服务、选举器退出选举、停止健康检查器等。

      (一)初始化

      1)zk初始化:

      主要是从配置信息中获取ZooKeeper连接参数,比如zkQuorum、zkTimeout、zkAcls、zkAuths、getParentZnode(ha根路径)、maxRetryNum等,然后创建选举器ActiveStandbyElector实例elector,并注册一个回调函数ElectorCallbacks。

      HA在ZooKeeper上的路径取自参数ha.zookeeper.parent-znode,默认为/hadoop-ha。

      关于回调函数的作用,在后续文章中单独介绍。

      2)rpc初始化与启动:

      rpc的初始化主要是构造一个ZKFCRpcServer对象,绑定socket地址,然后start()启动。

      3)健康监视器初始化与启动:

      健康监视器的初始化则是构造HealthMonitor对象,传入NN地址封装类HAServiceTarget实例localTarget,然后注册状态回调函数、注册服务状态回调函数,最后启动,如下:

  /**
   * 初始化健康监视器:HealthMonitor
   */
  private void initHM() {
	  
	// 构造健康监视器HealthMonitor实例
    healthMonitor = new HealthMonitor(conf, localTarget);
    
    // 注册状态回调函数
    healthMonitor.addCallback(new HealthCallbacks());
    
    // 注册服务状态回调函数
    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
    
    // 启动健康监视器
    healthMonitor.start();
  }
      这个状态回调函数和服务状态回调函数是在健康监视器有结果时调用的,主要是设置上次健康状态并复核选举可能性。

      (二)主循环

        主循环的逻辑比较简单,如下:

  private synchronized void mainLoop() throws InterruptedException {
    
	// 如果没有致命错误的话,一直等待  
	while (fatalError == null) {
      wait();
    }
    
	// 如果有致命错误,抛出运行时异常
	assert fatalError != null; // only get here on fatal
    throw new RuntimeException(
        "ZK Failover Controller failed: " + fatalError);
  }
      1)如果没有致命错误的话,一直调用wait()进行等待 ;
      2)如果有致命错误,抛出运行时异常。

      (三)停止

        停止比较简单,调用一些列方法实现即可。


      剩余的回调函数、复核选举可能性等细节请关注后续文章。


上一篇:Oracle 11g R2 配置管理


下一篇:Git分支的新建与合并实战