HMaster分析之Region的负载均衡实现(一)

        HBase中Region是表按行方向切分的一个个数据区域,由RegionServer负责管理,并向外提供数据读写服务。如果一个RegionServer上的Region过多,那么该RegionServer对应的就会承担过多的读写等服务请求,也就有可能在高并发访问的情况下,造成服务器性能下降甚至宕机。如此,RegionServer间Region的动态负载均衡,也就成了HBase实现高性能读写请求访问的一个需要解决的问题。那么,Region是如何在RegionServer间动态负载均衡的呢?

        在HMaster中,有几个成员变量定义如下:

  LoadBalancer balancer;// 实现Region动态负载均衡的实体
  private BalancerChore balancerChore;// 完成动态负载均衡的工作线程
  // Tracker for load balancer state
  // 加载balancer状态的跟踪器
  LoadBalancerTracker loadBalancerTracker;

        那么,这三个变量在HMaster启动时,是如何初始化的呢?

        在HMaster的构造函数中,有如下调用:

    // 开启Master的各种管理者
    startActiveMasterManager(infoPort);
        startActiveMasterManager()方法中,又有如下调用:

    finishActiveMasterInitialization(status);

        接下来,finishActiveMasterInitialization()方法中,开始了动态负载均衡中用到的各实体的初始化。代码如下:

    initializeZKBasedSystemTrackers();

    //initialize load balancer
    // 初始化balancer
    this.balancer.setClusterStatus(getClusterStatus());
    this.balancer.setMasterServices(this);
    this.balancer.initialize();

    // 创建工作线程balancerChore,它会周期性的调用HMaster的balance()方法,调用周期为参数
    // hbase.balancer.period配置的值,未配置的话默认为5分钟
    this.balancerChore = new BalancerChore(this);
    Threads.setDaemonThreadRunning(balancerChore.getThread());

       而initializeZKBasedSystemTrackers()方法中,完成了三个相关组件的初始化,从而可以实现finishActiveMasterInitialization()方法中后面对balancer的设置。

    // 初始化LoadBalancer类型的成员变量balancer
    // 利用发射技术,优先加载hbase.master.loadbalancer.class参数配置的均衡器类,
    // 参数未配置再加载StochasticLoadBalancer
    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
    // 初始化LoadBalancerTracker类型的成员变量loadBalancerTracker
    this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
    // 启动loadBalancerTracker
    this.loadBalancerTracker.start();

        首先,介绍下BalancerChore类型的成员变量balancerChore,它为一个后台线程,会周期性的调用HMaster的balance()方法,周期性的完成实际的Region的负载均衡。它的定义如下:

/**
 * Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when
 * needed.
 * 
 * 在需要的时候钓调用HMaster的balance()方法的工作线程。
 */
@InterfaceAudience.Private
public class BalancerChore extends Chore {
  private static final Log LOG = LogFactory.getLog(BalancerChore.class);

  private final HMaster master;

  // 构造方法,线程的名称为master的serverName + "-BalancerChore",
  // chore()方法循环调用周期为参数hbase.balancer.period,默认为5分钟
  public BalancerChore(HMaster master) {
    super(master.getServerName() + "-BalancerChore",
        master.getConfiguration().getInt("hbase.balancer.period", 300000),
        master);
    this.master = master;
  }

  /**
   * 线程的run()方法会周期性的调用chore()方法
   */
  @Override
  protected void chore() {
    try {
      // 调用HMaster的balance()方法,完成实际的Region动态负载均衡
      master.balance();
    } catch (IOException e) {
      LOG.error("Failed to balance.", e);
    }
  }
}

      它的父类Chore为HBase中的抽象线程类,HBase中很多工作线程都是继承自Chore,它的run()方法定义如下:

  /**
   * @see java.lang.Thread#run()
   */
  @Override
  public void run() {
    try {
      boolean initialChoreComplete = false;
      while (!this.stopper.isStopped()) {
    	// 开始时间
        long startTime = System.currentTimeMillis();
        try {
          // 如果是第一次循环,完成初始化工作
          if (!initialChoreComplete) {
            initialChoreComplete = initialChore();
          } else {
        	// 第一次后的每次循环,则周期性的调用chore()方法
            chore();
          }
        } catch (Exception e) {
          LOG.error("Caught exception", e);
          if (this.stopper.isStopped()) {
            continue;
          }
        }
        
        // 睡眠期睡眠一定的时间,然后再去调用chore()方法
        this.sleeper.sleep(startTime);
      }
    } catch (Throwable t) {
      LOG.fatal(getName() + "error", t);
    } finally {
      LOG.info(getName() + " exiting");
      cleanup();
    }
  }

    下面,我们再重点分析下HMaster的balance()方法,源码如下:

  public boolean balance() throws IOException {
    // if master not initialized, don't run balancer.
	// 如果master没有被初始化,不能运行balancer
    if (!this.initialized) {
      LOG.debug("Master has not been initialized, don't run balancer.");
      return false;
    }
    // Do this call outside of synchronized block.
    int maximumBalanceTime = getBalancerCutoffTime();
    synchronized (this.balancer) {// 在this.balancer上同步
      // If balance not true, don't run balancer.
      // 从loadBalancerTracker处获取balancer是否已开启,如果没有,则返回false
      if (!this.loadBalancerTracker.isBalancerOn()) return false;
      // Only allow one balance run at at time.
      if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
        Map<String, RegionState> regionsInTransition =
          this.assignmentManager.getRegionStates().getRegionsInTransition();
        LOG.debug("Not running balancer because " + regionsInTransition.size() +
          " region(s) in transition: " + org.apache.commons.lang.StringUtils.
            abbreviate(regionsInTransition.toString(), 256));
        return false;
      }
      if (this.serverManager.areDeadServersInProgress()) {
        LOG.debug("Not running balancer because processing dead regionserver(s): " +
          this.serverManager.getDeadServers());
        return false;
      }

      if (this.cpHost != null) {
        try {
          if (this.cpHost.preBalance()) {
            LOG.debug("Coprocessor bypassing balancer request");
            return false;
          }
        } catch (IOException ioe) {
          LOG.error("Error invoking master coprocessor preBalance()", ioe);
          return false;
        }
      }

      // 获取表名->{ServerName->Region列表的映射集合}的映射集合assignmentsByTable
      Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
        this.assignmentManager.getRegionStates().getAssignmentsByTable();

      List<RegionPlan> plans = new ArrayList<RegionPlan>();
      //Give the balancer the current cluster state.
      // 设置balancer中集群最新的状态
      this.balancer.setClusterStatus(getClusterStatus());
      
      // 循环assignmentsByTable中的value:每个表的ServerName->Region列表的映射集合
      for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
        List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
        if (partialPlans != null) plans.addAll(partialPlans);
      }
      long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
      
      // 移动的Region总个数
      int rpCount = 0;  // number of RegionPlans balanced so far
      
      // Region的移动计划总耗时
      long totalRegPlanExecTime = 0;
      
      // Region的移动计划不为空
      if (plans != null && !plans.isEmpty()) {
    	// 循环处理
        for (RegionPlan plan: plans) {
          LOG.info("balance " + plan);
          // 开始时间
          long balStartTime = System.currentTimeMillis();
          //TODO: bulk assign
          // 调用assignmentManager的balance()方法执行计划
          this.assignmentManager.balance(plan);
          // 累加Region的移动计划总耗时
          totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
          // 累加移动的Region总个数
          rpCount++;
          if (rpCount < plans.size() &&
              // if performing next balance exceeds cutoff time, exit the loop
        	  // 如果完成下一个balance的时间超过cutoffTime,退出循环
              // 这个完成时间是预估的,Region移动的平均耗时,用一个粗略的算法,已完成Region移动的总 耗时/已完成Region移动的总个数
              (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
            //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
            LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
              maximumBalanceTime);
            break;
          }
        }
      }
      
      // 如果协处理器主机不为空,运行协处理器的钩子方法postBalance()
      if (this.cpHost != null) {
        try {
          this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
        } catch (IOException ioe) {
          // balancing already succeeded so don't change the result
          LOG.error("Error invoking master coprocessor postBalance()", ioe);
        }
      }
    }
    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
    // Return true indicating a success.
    return true;
  }

        balance()方法中,首先会利用HMaster中的成员变量assignmentManager,获取表名->{ServerName->Region列表的映射集合}的映射集合assignmentsByTable,如下:

      // 获取表名->{ServerName->Region列表的映射集合}的映射集合assignmentsByTable
      Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
        this.assignmentManager.getRegionStates().getAssignmentsByTable();
        然后,通过上面提到的HMaster的成员变量balancer的balanceCluster()方法,获得Region的 移动计划列表,添加到数据结构List<RegionPlan>类型的plans中。如下:

      // 循环assignmentsByTable中的value:每个表的ServerName->Region列表的映射集合
      for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
        List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
        if (partialPlans != null) plans.addAll(partialPlans);
      }
        紧接着,循环处理这个移动计划列表plans,开始移动Region,如下:

      // Region的移动计划不为空
      if (plans != null && !plans.isEmpty()) {
    	// 循环处理
        for (RegionPlan plan: plans) {
          LOG.info("balance " + plan);
          // 开始时间
          long balStartTime = System.currentTimeMillis();
          //TODO: bulk assign
          // 调用assignmentManager的balance()方法执行计划
          this.assignmentManager.balance(plan);
          // 累加Region的移动计划总耗时
          totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
          // 累加移动的Region总个数
          rpCount++;
          if (rpCount < plans.size() &&
              // if performing next balance exceeds cutoff time, exit the loop
        	  // 如果完成下一个balance的时间超过cutoffTime,退出循环
              // 这个完成时间是预估的,Region移动的平均耗时,用一个粗略的算法,已完成Region移动的总 耗时/已完成Region移动的总个数
              (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
            //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
            LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
              maximumBalanceTime);
            break;
          }
        }
      }
        移动计划的执行,实际上是由assignmentManager的balance()实现的,并且,在执行移动计划时,会根据以往执行过的计划的平均耗时是否超过一定阈值,来确定是继续此移动计划还是跳过转而执行下一个。

        最后,如果协处理器主机不为空,运行协处理器的钩子方法postBalance(),如下:

      // 如果协处理器主机不为空,运行协处理器的钩子方法postBalance()
      if (this.cpHost != null) {
        try {
          this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
        } catch (IOException ioe) {
          // balancing already succeeded so don't change the result
          LOG.error("Error invoking master coprocessor postBalance()", ioe);
        }
      }
    }

      那么,最关键的几个问题,就是:

     1、需要移动的Region是如何被选中,它又要被移动往哪里?

     2、Region移动的执行,具体的流程是怎么样的?

      且听下回分解~






上一篇:主流开源分布式图数据库 Benchmark


下一篇:HBase源码分析之HRegionServer上MemStore的flush处理流程(二)