PressureAwareCompactionThroughputController里面的tuneChore定期获取server.getCompactionPressure()
用来做流量的计算
获取本server上所有region的所有store,store.getCompactionPressure最大的一个
store.getCompactionPressure如何计算
public double getCompactionPressure() {
return storeEngine.getStoreFileManager().getCompactionPressure();
}
@Override
public double getCompactionPressure() {
int storefileCount = getStorefileCount(); //文件数量
int minFilesToCompact = comConf.getMinFilesToCompact(); "hbase.hstore.compaction.min" 旧配置 “hbase.hstore.compactionThreshold” 未配置的为3
if (storefileCount <= minFilesToCompact) {
return 0.0;
}
//blockingFileCount = "hbase.hstore.blockingStoreFiles" 默认为7
//这个公式含义: 现在可compact的文件数是否超过需要blocking的数量
return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
}
private void tune(double compactionPressure) {
double maxThroughputToSet;
// compactionPressure > 1.0 //超过了blocking File 不限流
if (compactionPressure > 1.0) {
// set to unlimited if some stores already reach the blocking store file count
maxThroughputToSet = Double.MAX_VALUE;
// 在空闲时间内
} else if (offPeakHours.isOffPeakHour()) {
maxThroughputToSet = maxThroughputOffpeak;
}
//不在空闲时间内
else {
// compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
* compactionPressure;
}
if (LOG.isDebugEnabled()) {
LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+ throughputDesc(maxThroughputToSet));
}
this.maxThroughput = maxThroughputToSet;
}
//=====================================================================================================
@Override
public long control(String compactionName, long size) throws InterruptedException {
ActiveCompaction compaction = activeCompactions.get(compactionName);
compaction.totalSize += size;//当前这个compact的size累加
long deltaSize = compaction.totalSize - compaction.lastControlSize; //变化的值
//controlPerSize = max; 如果变化的值大于controlPerSize直接返回0
if (deltaSize < controlPerSize) {
return 0;
}
long now = EnvironmentEdgeManager.currentTimeMillis();
//maxThroughputPerCompaction(每个compact 流量) = 机器compact流量最大值 / 当前活跃的Compact任务数量
double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
// 最小允许时间 = 本次需要的流量 / 每个compact可用流量 *1000 ( 单位 ms)
long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
// 从上次触发control到这次的时间
long elapsedTime = now - compaction.lastControlTime;
compaction.lastControlSize = compaction.totalSize;
//如果 已经经过的时间 大于 最小控制时间 (说明时间已经充足了,直接返回0)
if (elapsedTime >= minTimeAllowed) {
compaction.lastControlTime = EnvironmentEdgeManager.currentTimeMillis();
return 0;
}
// 如果经过的时间,小于等于最小允许时间 说明:too fast
// sleepTime 可允许的时间-已经等待的时间
long sleepTime = minTimeAllowed - elapsedTime;
if (LOG.isDebugEnabled()) {
// do not log too much
if (now - compaction.lastLogTime > 60L * 1000) {
LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
+ throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms till now.");
compaction.lastLogTime = now;
}
}
Thread.sleep(sleepTime);
compaction.numberOfSleeps++;
compaction.totalSleepTime += sleepTime;
compaction.lastControlTime = EnvironmentEdgeManager.currentTimeMillis();
return sleepTime;
}
gloria_y 发布了52 篇原创文章 · 获赞 4 · 访问量 5万+ 私信 关注