hdfs Balancer剖析

  balance过程就是从存储使用率超出集群平均使用率的datanode上将超出的block移动到低于集群平均使用率的datanode上,最终满足平衡标准。
  
  over-utilized------>under-utilized
  
  over-utilized------>below-average
  
  above-average--->under-utilized
  
  1. over-utilized :使用率超出阀值的datanode
  
  2. above-average : 使用率超出平均值,但是低于阀值的datanode
  
  3. below-average : 使用率低于平均值,但是高于阀值的datanode
  
  4. under-utilized: 使用率低于阀值的datanode
  
  平衡标准:参与balance的每个datanode的使用率 与 所有参与balance的全局的使用率 的差值 接近 给定的阀值threshold
  
  balance过程分多个迭代完成,每个迭代开始时balancer按照上面的规则将所有参与balance的datanode分类、配对,形成<source,target>集合,一个<source,target>表示将source datanode上的部分block迁移到target datanode上。
  
  每个<source,target>对应一个线程,该线程主要功能:
  
  1、从namenode上随机拉取一定数量的block(每次最多2G,累计20G),筛选之后保存到src_block列表中。触发拉取的条件之一:src_block列表中尚未被迁移的block数量少于5(固定值,不可配)。
  
  2、将src_block列表中的block提交到线程池(线程池大小:dfs.balancer.moverThreads,默认1000)进行迁移。
  
  迁移过程使用多线程(线程数量:dfs.balancer.moverThreads)完成,每个线程每次迁移一个block。
  
  迁移过程:
  
  step1:从src_block列表中选择一个block
  
  step2:检查block的所有副本所在的datanode中是否有跟target同机架的datanode,如果有则选择其作为proxy,如果没有则从中随机选择一个datanode作为proxy。
  
  step3:balancer通知target 从proxy上将block复制一份到本机。
  
  step4:如果复制成功,则target 请求namenode将block从source上删除,并反馈balancer。
  
  step1选择block时,会检查target,step2选择proxy时,会检查备选datanode,检查的内容包括:
  
  1、当前正在执行复制的线程数量是否超过最大线程数(dfs.datanode.balance.max.concurrent.moves)
  
  2、是否在禁用期。每次复制失败都会导致datanode 10秒钟内禁止做为proxy和target。
  
  核心方法、流程:
  
  Balancer.runOneIteration() //开始一轮
  
  Dispatcher.init(); //选择参与balance的datanode
  
  Balancer.init(); //分组
  
  Balancer.chooseStorageGroups() //构建<source, target> 优先匹配同机架、然后随意搭配
  
  Dispatcher.dispatchAndCheckContinue() //以source为单位分发block移动任务
  
  Dispatcher.dispatchBlockMoves(); //每个source启动一个线程
  
  Source.dispatchBlocks();
  
  Source.getBlockList(); //从namenode拉取总大小为2G的block
  
  Source.chooseNextMove() //每次选一个block放到线程池去移动
  
  PendingMove.chooseBlockAndProxy(); //选择一个block和一个proxy
  
  PendingMove.markMovedIfGoodBlock();
  
  PendingMove.isGoodBlockCandidate(); //是否是合适的block
  
  PendingMove.chooseProxySource() //为选择的block选择一个合适的proxy
  
  datanode分类:
  
  private long init(List<DatanodeStorageReport> reports) {
  
  // compute average utilization
  
  for (DatanodeStorageReport r : reports) {
  
  policy.accumulateSpaces(r);
  
  }
  
  policy.initAvgUtilization();
  
  // create network topology and classify utilization collections:
  
  // over-utilized, above-average, below-average and under-utilized.
  
  long overLoadedBytes = 0L, underLoadedBytes = 0L;
  
  for(DatanodeStorageReport r :www.jujinyule.com reports) {
  
  final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
  
  for(StorageType t : StorageType.getMovableTypes(www.qianchengyul.com)) {
  
  //所有参与balance的节点[lived 且包含在include中的节点]的使用率(已使用的存储/总的存储)
  
  final Double utilization = policy.getUtilization(r, t);
  
  if (utilization == null) { www.hengxuangyul.com// datanode does not have such storage type
  
  continue;
  
  }
  
  final long capacity = getCapacity(r,www.xcdeyiju.com t);
  
  final double utilizationDiff = utilization - policy.getAvgUtilization(t);
  
  //thresholdDiff越小说明该datanode越理想
  
  final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
  
  final long maxSize2Move = computeMaxSize2Move(capacity,
  
  getRemaining(r, t), utilizationDiff, threshold);
  
  final StorageGroup g;
  
  if (utilizationDiff > 0) {
  
  final Source s = dn.addSource(t, maxSize2Move, dispatcher);
  
  if (thresholdDiff <= 0) { // within threshold
  
  aboveAvgUtilized.add(s);
  
  } else {
  
  overLoadedBytes +http://www.senta7.net/content/?729.html= precentage2bytes(thresholdDiff, capacity);
  
  overUtilized.add(s);
  
  }
  
  g = s;
  
  } else {
  
  g = dn.addTarget(t, maxSize2Move);
  
  if (thresholdDiff <=www.seocelve.com 0) { // within threshold
  
  belowAvgUtilized.add(g);
  
  } else {
  
  underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
  
  underUtilized.add(g);
  
  }
  
  }
  
  dispatcher.getStorageGroupMap().put(g);
  
  }
  
  }
  
  logUtilizationCollections();
  
  Preconditions.checkState(dispatcher.getStorageGroupMap().size()
  
  == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
  
  + belowAvgUtilized.size(),
  
  "Mismatched number of storage groups");
  
  // return number of bytes to be moved in order to make the cluster balanced
  
  return Math.max(overLoadedBytes, underLoadedBytes);
  
  }
  
  选择proxy:
  
  private boolean chooseProxySource(www.hnawesm.com ) {
  
  final DatanodeInfo targetDN = target.getDatanodeInfo();
  
  // if source and target are same nodes then no need of proxy
  
  if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
  
  return true;
  
  }
  
  // if node group is supported, first try add nodes in the same node group
  
  if (cluster.isNodeGroupAware()) {
  
  for (StorageGroup loc : block.getLocations()) {
  
  if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
  
  && addTo(loc)) {
  
  return true;
  
  }
  
  }
  
  }
  
  // check if there is replica which is on the same rack with the target
  
  //优选选择副本所在机器跟target在相同rack的dn作为proxy,同时需要改dn当前处理balance[发送block]的线程数量是否超标
  
  for (StorageGroup loc : block.getLocations()) {
  
  if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
  
  return true;
  
  }
  
  }
  
  // find out a non-busy replica
  
  //如果以上都没有选择出合适的proxy,那么就选一个不忙的dn作为proxy
  
  for (StorageGroup loc : block.getLocations()) {
  
  if (addTo(loc)) {
  
  return true;
  
  }
  
  }
  
  return false;
  
  }
  
  block 迁移:
  
  step1:balancer socket连接target,发起replaceBlock 请求,请求target从proxy上复制一个block副本到本地来替换掉source上的副本。
  
  step2:target向proxy 发起copyBlock请求,从proxy上将block副本复制到本地,复制完成后 target 通过notifyNamenodeReceivedBlock 方法生成一个ReceivedDeletedBlockInfo对象并缓存在队列,下一次发起心跳的时候会据此对象通知namenode 将target上新加的block副本存入blockmap,并将source上对应的block 副本删除
  
  private void dispatch() {
  
  if (LOG.isDebugEnabled()) {
  
  LOG.debug("Start moving " + this);
  
  }
  
  Socket sock = new Socket();
  
  DataOutputStream out = null;
  
  DataInputStream in = null;
  
  try {
  
  //balaner建立跟target的连接
  
  sock.connect(
  
  NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
  
  HdfsServerConstants.READ_TIMEOUT);
  
  sock.setKeepAlive(true);
  
  OutputStream unbufOut = sock.getOutputStream();
  
  InputStream unbufIn = sock.getInputStream();
  
  ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
  
  block.getBlock());
  
  final KeyManager km = nnc.getKeyManager();
  
  Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
  
  IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
  
  unbufIn, km, accessToken, target.getDatanodeInfo());
  
  unbufOut = saslStreams.out;
  
  unbufIn = saslStreams.in;
  
  out = new DataOutputStream(new BufferedOutputStream(unbufOut,
  
  HdfsConstants.IO_FILE_BUFFER_SIZE));
  
  in = new DataInputStream(new BufferedInputStream(unbufIn,
  
  HdfsConstants.IO_FILE_BUFFER_SIZE));
  
  //向target发请求,命令其复制block副本
  
  sendRequest(out, eb, accessToken);
  
  receiveResponse(in);
  
  nnc.getBytesMoved().addAndGet(block.getNumBytes());
  
  LOG.info("Successfully moved " + this);
  
  } catch (IOException e) {
  
  LOG.warn("Failed to move " + this + ": " + e.getMessage());
  
  target.getDDatanode().setHasFailure();
  
  // Proxy or target may have some issues, delay before using these nodes
  
  // further in order to avoid a potential storm of "threads quota
  
  // exceeded" warnings when the dispatcher gets out of sync with work
  
  // going on in datanodes.
  
  //迁移失败,可能是因为proxy、target当前过于繁忙(同时处理blockReplace的操作太多),所以延迟其参与balance
  
  proxySource.activateDelay(delayAfterErrors);
  
  target.getDDatanode().activateDelay(delayAfterErrors);
  
  } finally {
  
  IOUtils.closeStream(out);
  
  IOUtils.closeStream(in);
  
  IOUtils.closeSocket(sock);
  
  //不管迁移成功还是失败,都将当前block从队列中删除
  
  proxySource.removePendingBlock(this);
  
  target.getDDatanode().removePendingBlock(this);
  
  synchronized (this) {
  
  reset();
  
  }
  
  synchronized (Dispatcher.this) {
  
  Dispatcher.this.notifyAll();
  
  }
  
  }
  
  }
  
  private void sendRequest(DataOutputStream out, ExtendedBlock eb,
  
  Token<BlockTokenIdentifier> accessToken) throws IOException {
  
  new Sender(out).replaceBlock(eb, target.storageType, accessToken,
  
  source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); //第4个参数是source 的uuid(在name上唯一标示一个datanode),用于通知namenode删source上的副本
  
  }
  
  target复制完成后的日志:
  
  2019-07-03 10:15:41,319 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Moved BP-691805646-10.116.100.3-1498296722300:blk_2306375028_1234536481 from /10.116.100.149:36907, delHint=7a29645c-cc12-44ce-b9c3-6642a82317c4
  
  常见异常:
  
  target异常:
  
  WARN balancer.Dispatcher: Failed to move blk_10818540065_9759022625 with size=12178 from 10.116.100.126:50010:DISK to 10.116.100.143:50010:DISK through 10.116.102.93:50010: Got error, status message Not able to receive block 10818540065 from /10.116.100.149:57891 because threads quota is exceeded., block move is failed
  
  proxy异常:
  
  WARN balancer.Dispatcher: Failed to move blk_13904085291_12851796248 with size=412 from 10.116.101.126:50010:DISK to 10.116.101.227:50010:DISK through 10.116.100.51:50010: Got error, status message opReplaceBlock BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 received exception java.io.IOException: Got error, status message Not able to copy block 13904085291 to /10.116.101.227:42066 because threads quota is exceeded., copy block BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 from /10.116.100.51:50010, block move is failed
  
  threads quota is exceeded 说明datanode上(target、proxy)上当前正在参与balance进行blockReplace(target)和blockCopy(proxy)的线程数量超过阀值(dfs.datanode.balance.max.concurrent.moves),所以block迁移失败。
  
  如果大量出现此类异常,那么balance的速度会很慢:
  
  直接原因:block迁移失败;
  
  间接原因:target和proxy会进入禁用期,导致可选proxy减少,进而src_block中的block不能及时被消费,也不能拉取新的blcok
  
  balance慢的解决办法:
  
  1、将datanode的迁移线程数dfs.datanode.balance.max.concurrent.moves增大。增大之后threads quota is exceeded 的问题会缓解,balance速度会加快。修改此参数需重启datanode。
  
  2、将balancer的dfs.datanode.balance.max.concurrent.moves 增大。balancer上此值应该比datanode上的值稍微小一点,因为两个股进程存在状态不同步的可能。
  
  3、将-threshold 增大。增大之后可以优先迁移使用率最高的datanode,待使用率将下来之后,再将threshold降低。
  
  4、如果是多namespace的情况,则可以将-policy 由默认datanode改为Pool,在迁移时应根据namespace对应blockpool的使用率来评估datanode是否要balance。

上一篇:Hadoop笔记 -- 机架感知


下一篇:Hadoop Getting Started