balance过程就是从存储使用率超出集群平均使用率的datanode上将超出的block移动到低于集群平均使用率的datanode上,最终满足平衡标准。
over-utilized------>under-utilized
over-utilized------>below-average
above-average--->under-utilized
1. over-utilized :使用率超出阀值的datanode
2. above-average : 使用率超出平均值,但是低于阀值的datanode
3. below-average : 使用率低于平均值,但是高于阀值的datanode
4. under-utilized: 使用率低于阀值的datanode
平衡标准:参与balance的每个datanode的使用率 与 所有参与balance的全局的使用率 的差值 接近 给定的阀值threshold
balance过程分多个迭代完成,每个迭代开始时balancer按照上面的规则将所有参与balance的datanode分类、配对,形成<source,target>集合,一个<source,target>表示将source datanode上的部分block迁移到target datanode上。
每个<source,target>对应一个线程,该线程主要功能:
1、从namenode上随机拉取一定数量的block(每次最多2G,累计20G),筛选之后保存到src_block列表中。触发拉取的条件之一:src_block列表中尚未被迁移的block数量少于5(固定值,不可配)。
2、将src_block列表中的block提交到线程池(线程池大小:dfs.balancer.moverThreads,默认1000)进行迁移。
迁移过程使用多线程(线程数量:dfs.balancer.moverThreads)完成,每个线程每次迁移一个block。
迁移过程:
step1:从src_block列表中选择一个block
step2:检查block的所有副本所在的datanode中是否有跟target同机架的datanode,如果有则选择其作为proxy,如果没有则从中随机选择一个datanode作为proxy。
step3:balancer通知target 从proxy上将block复制一份到本机。
step4:如果复制成功,则target 请求namenode将block从source上删除,并反馈balancer。
step1选择block时,会检查target,step2选择proxy时,会检查备选datanode,检查的内容包括:
1、当前正在执行复制的线程数量是否超过最大线程数(dfs.datanode.balance.max.concurrent.moves)
2、是否在禁用期。每次复制失败都会导致datanode 10秒钟内禁止做为proxy和target。
核心方法、流程:
Balancer.runOneIteration() //开始一轮
Dispatcher.init(); //选择参与balance的datanode
Balancer.init(); //分组
Balancer.chooseStorageGroups() //构建<source, target> 优先匹配同机架、然后随意搭配
Dispatcher.dispatchAndCheckContinue() //以source为单位分发block移动任务
Dispatcher.dispatchBlockMoves(); //每个source启动一个线程
Source.dispatchBlocks();
Source.getBlockList(); //从namenode拉取总大小为2G的block
Source.chooseNextMove() //每次选一个block放到线程池去移动
PendingMove.chooseBlockAndProxy(); //选择一个block和一个proxy
PendingMove.markMovedIfGoodBlock();
PendingMove.isGoodBlockCandidate(); //是否是合适的block
PendingMove.chooseProxySource() //为选择的block选择一个合适的proxy
datanode分类:
private long init(List<DatanodeStorageReport> reports) {
// compute average utilization
for (DatanodeStorageReport r : reports) {
policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
// create network topology and classify utilization collections:
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r :www.jujinyule.com reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes(www.qianchengyul.com)) {
//所有参与balance的节点[lived 且包含在include中的节点]的使用率(已使用的存储/总的存储)
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { www.hengxuangyul.com// datanode does not have such storage type
continue;
}
final long capacity = getCapacity(r,www.xcdeyiju.com t);
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
//thresholdDiff越小说明该datanode越理想
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, threshold);
final StorageGroup g;
if (utilizationDiff > 0) {
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
} else {
overLoadedBytes +http://www.senta7.net/content/?729.html= precentage2bytes(thresholdDiff, capacity);
overUtilized.add(s);
}
g = s;
} else {
g = dn.addTarget(t, maxSize2Move);
if (thresholdDiff <=www.seocelve.com 0) { // within threshold
belowAvgUtilized.add(g);
} else {
underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
underUtilized.add(g);
}
}
dispatcher.getStorageGroupMap().put(g);
}
}
logUtilizationCollections();
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+ belowAvgUtilized.size(),
"Mismatched number of storage groups");
// return number of bytes to be moved in order to make the cluster balanced
return Math.max(overLoadedBytes, underLoadedBytes);
}
选择proxy:
private boolean chooseProxySource(www.hnawesm.com ) {
final DatanodeInfo targetDN = target.getDatanodeInfo();
// if source and target are same nodes then no need of proxy
if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
return true;
}
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
for (StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
&& addTo(loc)) {
return true;
}
}
}
// check if there is replica which is on the same rack with the target
//优选选择副本所在机器跟target在相同rack的dn作为proxy,同时需要改dn当前处理balance[发送block]的线程数量是否超标
for (StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
return true;
}
}
// find out a non-busy replica
//如果以上都没有选择出合适的proxy,那么就选一个不忙的dn作为proxy
for (StorageGroup loc : block.getLocations()) {
if (addTo(loc)) {
return true;
}
}
return false;
}
block 迁移:
step1:balancer socket连接target,发起replaceBlock 请求,请求target从proxy上复制一个block副本到本地来替换掉source上的副本。
step2:target向proxy 发起copyBlock请求,从proxy上将block副本复制到本地,复制完成后 target 通过notifyNamenodeReceivedBlock 方法生成一个ReceivedDeletedBlockInfo对象并缓存在队列,下一次发起心跳的时候会据此对象通知namenode 将target上新加的block副本存入blockmap,并将source上对应的block 副本删除
private void dispatch() {
if (LOG.isDebugEnabled()) {
LOG.debug("Start moving " + this);
}
Socket sock = new Socket();
DataOutputStream out = null;
DataInputStream in = null;
try {
//balaner建立跟target的连接
sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
block.getBlock());
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.IO_FILE_BUFFER_SIZE));
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
//向target发请求,命令其复制block副本
sendRequest(out, eb, accessToken);
receiveResponse(in);
nnc.getBytesMoved().addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
target.getDDatanode().setHasFailure();
// Proxy or target may have some issues, delay before using these nodes
// further in order to avoid a potential storm of "threads quota
// exceeded" warnings when the dispatcher gets out of sync with work
// going on in datanodes.
//迁移失败,可能是因为proxy、target当前过于繁忙(同时处理blockReplace的操作太多),所以延迟其参与balance
proxySource.activateDelay(delayAfterErrors);
target.getDDatanode().activateDelay(delayAfterErrors);
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
//不管迁移成功还是失败,都将当前block从队列中删除
proxySource.removePendingBlock(this);
target.getDDatanode().removePendingBlock(this);
synchronized (this) {
reset();
}
synchronized (Dispatcher.this) {
Dispatcher.this.notifyAll();
}
}
}
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken,
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); //第4个参数是source 的uuid(在name上唯一标示一个datanode),用于通知namenode删source上的副本
}
target复制完成后的日志:
2019-07-03 10:15:41,319 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Moved BP-691805646-10.116.100.3-1498296722300:blk_2306375028_1234536481 from /10.116.100.149:36907, delHint=7a29645c-cc12-44ce-b9c3-6642a82317c4
常见异常:
target异常:
WARN balancer.Dispatcher: Failed to move blk_10818540065_9759022625 with size=12178 from 10.116.100.126:50010:DISK to 10.116.100.143:50010:DISK through 10.116.102.93:50010: Got error, status message Not able to receive block 10818540065 from /10.116.100.149:57891 because threads quota is exceeded., block move is failed
proxy异常:
WARN balancer.Dispatcher: Failed to move blk_13904085291_12851796248 with size=412 from 10.116.101.126:50010:DISK to 10.116.101.227:50010:DISK through 10.116.100.51:50010: Got error, status message opReplaceBlock BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 received exception java.io.IOException: Got error, status message Not able to copy block 13904085291 to /10.116.101.227:42066 because threads quota is exceeded., copy block BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 from /10.116.100.51:50010, block move is failed
threads quota is exceeded 说明datanode上(target、proxy)上当前正在参与balance进行blockReplace(target)和blockCopy(proxy)的线程数量超过阀值(dfs.datanode.balance.max.concurrent.moves),所以block迁移失败。
如果大量出现此类异常,那么balance的速度会很慢:
直接原因:block迁移失败;
间接原因:target和proxy会进入禁用期,导致可选proxy减少,进而src_block中的block不能及时被消费,也不能拉取新的blcok
balance慢的解决办法:
1、将datanode的迁移线程数dfs.datanode.balance.max.concurrent.moves增大。增大之后threads quota is exceeded 的问题会缓解,balance速度会加快。修改此参数需重启datanode。
2、将balancer的dfs.datanode.balance.max.concurrent.moves 增大。balancer上此值应该比datanode上的值稍微小一点,因为两个股进程存在状态不同步的可能。
3、将-threshold 增大。增大之后可以优先迁移使用率最高的datanode,待使用率将下来之后,再将threshold降低。
4、如果是多namespace的情况,则可以将-policy 由默认datanode改为Pool,在迁移时应根据namespace对应blockpool的使用率来评估datanode是否要balance。
相关文章
- 02-11Flink:recursive.file.enumeration读hdfs超时无法提submit job问题
- 02-11剖析蒲公英智能云TaraX功能云存储NAS服务
- 02-11hive 报错/tmp/hive on HDFS should be writable. Current permissions are: rwx--x--x
- 02-11HDFS常用文件操作
- 02-11解决HDFS小文件带来的计算问题
- 02-11[HDFS Manual] CH8 HDFS Snapshots
- 02-11HDFS系统命令整理
- 02-11Phoenix原理 | Phoenix查询计划剖析
- 02-11Python开发【源码剖析】 Dict对象
- 02-11深度剖析功率电感和普通电感的区别