LoadBalancer balancer;// 实现Region动态负载均衡的实体 private BalancerChore balancerChore;// 完成动态负载均衡的工作线程
// Tracker for load balancer state // 加载balancer状态的跟踪器 LoadBalancerTracker loadBalancerTracker;
// 开启Master的各种管理者 startActiveMasterManager(infoPort);startActiveMasterManager()方法中,又有如下调用:
//initialize load balancer // 初始化balancer this.balancer.setClusterStatus(getClusterStatus()); this.balancer.setMasterServices(this); this.balancer.initialize();
// 创建工作线程balancerChore,它会周期性的调用HMaster的balance()方法,调用周期为参数 // hbase.balancer.period配置的值,未配置的话默认为5分钟 this.balancerChore = new BalancerChore(this); Threads.setDaemonThreadRunning(balancerChore.getThread());
// 初始化LoadBalancer类型的成员变量balancer // 利用发射技术,优先加载hbase.master.loadbalancer.class参数配置的均衡器类, // 参数未配置再加载StochasticLoadBalancer this.balancer = LoadBalancerFactory.getLoadBalancer(conf); // 初始化LoadBalancerTracker类型的成员变量loadBalancerTracker this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); // 启动loadBalancerTracker this.loadBalancerTracker.start();
/** * Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when * needed. * * 在需要的时候钓调用HMaster的balance()方法的工作线程。 */ @InterfaceAudience.Private public class BalancerChore extends Chore { private static final Log LOG = LogFactory.getLog(BalancerChore.class); private final HMaster master; // 构造方法,线程的名称为master的serverName + "-BalancerChore", // chore()方法循环调用周期为参数hbase.balancer.period,默认为5分钟 public BalancerChore(HMaster master) { super(master.getServerName() + "-BalancerChore", master.getConfiguration().getInt("hbase.balancer.period", 300000), master); this.master = master; } /** * 线程的run()方法会周期性的调用chore()方法 */ @Override protected void chore() { try { // 调用HMaster的balance()方法,完成实际的Region动态负载均衡 master.balance(); } catch (IOException e) { LOG.error("Failed to balance.", e); } } }
/** * @see java.lang.Thread#run() */ @Override public void run() { try { boolean initialChoreComplete = false; while (!this.stopper.isStopped()) { // 开始时间 long startTime = System.currentTimeMillis(); try { // 如果是第一次循环,完成初始化工作 if (!initialChoreComplete) { initialChoreComplete = initialChore(); } else { // 第一次后的每次循环,则周期性的调用chore()方法 chore(); } } catch (Exception e) { LOG.error("Caught exception", e); if (this.stopper.isStopped()) { continue; } } // 睡眠期睡眠一定的时间,然后再去调用chore()方法 this.sleeper.sleep(startTime); } } catch (Throwable t) { LOG.fatal(getName() + "error", t); } finally { LOG.info(getName() + " exiting"); cleanup(); } }
public boolean balance() throws IOException { // if master not initialized, don't run balancer. // 如果master没有被初始化,不能运行balancer if (!this.initialized) { LOG.debug("Master has not been initialized, don't run balancer."); return false; } // Do this call outside of synchronized block. int maximumBalanceTime = getBalancerCutoffTime(); synchronized (this.balancer) {// 在this.balancer上同步 // If balance not true, don't run balancer. // 从loadBalancerTracker处获取balancer是否已开启,如果没有,则返回false if (!this.loadBalancerTracker.isBalancerOn()) return false; // Only allow one balance run at at time. if (this.assignmentManager.getRegionStates().isRegionsInTransition()) { Map<String, RegionState> regionsInTransition = this.assignmentManager.getRegionStates().getRegionsInTransition(); LOG.debug("Not running balancer because " + regionsInTransition.size() + " region(s) in transition: " + org.apache.commons.lang.StringUtils. abbreviate(regionsInTransition.toString(), 256)); return false; } if (this.serverManager.areDeadServersInProgress()) { LOG.debug("Not running balancer because processing dead regionserver(s): " + this.serverManager.getDeadServers()); return false; } if (this.cpHost != null) { try { if (this.cpHost.preBalance()) { LOG.debug("Coprocessor bypassing balancer request"); return false; } } catch (IOException ioe) { LOG.error("Error invoking master coprocessor preBalance()", ioe); return false; } } // 获取表名->{ServerName->Region列表的映射集合}的映射集合assignmentsByTable Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable = this.assignmentManager.getRegionStates().getAssignmentsByTable(); List<RegionPlan> plans = new ArrayList<RegionPlan>(); //Give the balancer the current cluster state. // 设置balancer中集群最新的状态 this.balancer.setClusterStatus(getClusterStatus()); // 循环assignmentsByTable中的value:每个表的ServerName->Region列表的映射集合 for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) { List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments); if (partialPlans != null) plans.addAll(partialPlans); } long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; // 移动的Region总个数 int rpCount = 0; // number of RegionPlans balanced so far // Region的移动计划总耗时 long totalRegPlanExecTime = 0; // Region的移动计划不为空 if (plans != null && !plans.isEmpty()) { // 循环处理 for (RegionPlan plan: plans) { LOG.info("balance " + plan); // 开始时间 long balStartTime = System.currentTimeMillis(); //TODO: bulk assign // 调用assignmentManager的balance()方法执行计划 this.assignmentManager.balance(plan); // 累加Region的移动计划总耗时 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime; // 累加移动的Region总个数 rpCount++; if (rpCount < plans.size() && // if performing next balance exceeds cutoff time, exit the loop // 如果完成下一个balance的时间超过cutoffTime,退出循环 // 这个完成时间是预估的,Region移动的平均耗时,用一个粗略的算法,已完成Region移动的总 耗时/已完成Region移动的总个数 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now) LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + maximumBalanceTime); break; } } } // 如果协处理器主机不为空,运行协处理器的钩子方法postBalance() if (this.cpHost != null) { try { this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans); } catch (IOException ioe) { // balancing already succeeded so don't change the result LOG.error("Error invoking master coprocessor postBalance()", ioe); } } } // If LoadBalancer did not generate any plans, it means the cluster is already balanced. // Return true indicating a success. return true; }
// 获取表名->{ServerName->Region列表的映射集合}的映射集合assignmentsByTable Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable = this.assignmentManager.getRegionStates().getAssignmentsByTable();然后,通过上面提到的HMaster的成员变量balancer的balanceCluster()方法,获得Region的 移动计划列表,添加到数据结构List<RegionPlan>类型的plans中。如下:
// 循环assignmentsByTable中的value:每个表的ServerName->Region列表的映射集合 for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) { List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments); if (partialPlans != null) plans.addAll(partialPlans); }紧接着,循环处理这个移动计划列表plans,开始移动Region,如下:
// Region的移动计划不为空 if (plans != null && !plans.isEmpty()) { // 循环处理 for (RegionPlan plan: plans) { LOG.info("balance " + plan); // 开始时间 long balStartTime = System.currentTimeMillis(); //TODO: bulk assign // 调用assignmentManager的balance()方法执行计划 this.assignmentManager.balance(plan); // 累加Region的移动计划总耗时 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime; // 累加移动的Region总个数 rpCount++; if (rpCount < plans.size() && // if performing next balance exceeds cutoff time, exit the loop // 如果完成下一个balance的时间超过cutoffTime,退出循环 // 这个完成时间是预估的,Region移动的平均耗时,用一个粗略的算法,已完成Region移动的总 耗时/已完成Region移动的总个数 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now) LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + maximumBalanceTime); break; } } }移动计划的执行,实际上是由assignmentManager的balance()实现的,并且,在执行移动计划时,会根据以往执行过的计划的平均耗时是否超过一定阈值,来确定是继续此移动计划还是跳过转而执行下一个。
// 如果协处理器主机不为空,运行协处理器的钩子方法postBalance() if (this.cpHost != null) { try { this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans); } catch (IOException ioe) { // balancing already succeeded so don't change the result LOG.error("Error invoking master coprocessor postBalance()", ioe); } } }