Region Compact请求是在Region MemStore Flush之后被触发的:
boolean shouldCompact = region.flushcache(); // We just want to check the size
boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestCompaction(region, getName());
} server.getMetrics().addFlush(region.getRecentFlushInfo());
region.flushcache方法负责该Region实例的Flush操作,该操作的返回值shouldCompact如果为true,则表示该Region达到了Region Compact的要求,但此时并不一定会触发Compact请求,因为shouldCompact之后会有shouldSplit的判断,只有当shouldSplit为false且shouldCompact为true时才会触发Compact请求。
注:某一Region的Flush、Split、Compact是分别在不同的线程中进行的,三者之间的协调会在后续详细介绍,在此仅仅关注Compact的触发条件,以及Compact进行时如何选取StoreFile进行合并即可。
Region Compact触发(即shouldCompact为true)的判断条件是该Region中各个Store中的StoreFile数目:
/**
* See if there's too much store files in this store
*
* @return true if number of store files is greater than the number defined
* in minFilesToCompact
*/
public boolean needsCompaction() {
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
}
在某一Region做Flush操作时,内部是逐个对该Region的多个Store依次进行Flush的,每一个Store Flush完成之后,就会调用上述代码进行判断:
storefiles:表示Store Flush之后,StoreFile的总数目;
filesCompacting:表示Store中正处于Compact状态的StoreFile的数目,即表示该Store中的某些StoreFile已被选择进行Compact,这次判断需要忽略这些StoreFile;
minFilesToCompact:表示进行Store Compact时Store中的StoreFile数目(不包含已被选择进行Compact的StoreFile)所需要达到的下限值,它的值是由以下代码计算而来的:
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2, conf.getInt(
"hbase.hstore.compaction.min",
/* old name */conf
.getInt("hbase.hstore.compactionThreshold", 3)));
注意:hbase.hstore.compactionThreshold是一个旧的配置项,当配置项中出现hbase.hstore.compaction.min时,它将失效。
由上所示可知,shouldCompact为true的前提条件是当前Region中的某个Store中的StoreFile数目需要达到下限要求(需要去除那些已处于合并状态的StoreFile),如果满足条件,则通过requestCompaction发出具体的Compact请求,requestCompaction有多个重载方法,最终执行流程会被导向下述这些重载方法:
@Override
public synchronized List<CompactionRequest> requestCompaction(
final HRegion r, final String why, int pri,
final List<CompactionRequest> requests) throws IOException {
List<CompactionRequest> ret; // not a special compaction request, so make out own list
if (requests == null) {
ret = new ArrayList<CompactionRequest>(r.getStores().size()); for (Store s : r.getStores().values()) {
ret.add(requestCompaction(r, s, why, pri, null));
}
} else {
ret = new ArrayList<CompactionRequest>(requests.size()); for (CompactionRequest request : requests) {
ret.add(requestCompaction(r, request.getStore(), why, pri,
request));
}
} return ret;
}
可以看出,实际的Compact请求也是以Store为单位发出的,如下代码所示:
@Override
public synchronized CompactionRequest requestCompaction(final HRegion r,
final Store s, final String why, int priority,
CompactionRequest request) throws IOException {
if (this.server.isStopped()) {
return null;
} CompactionRequest cr = s.requestCompaction(priority, request); if (cr != null) {
cr.setServer(server); if (priority != Store.NO_PRIORITY) {
cr.setPriority(priority);
} ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) ? largeCompactions
: smallCompactions; pool.execute(cr); if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large "; LOG.debug(type
+ "Compaction requested: "
+ cr
+ (why != null && !why.isEmpty() ? "; Because: " + why
: "") + "; " + this);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + r.getRegionNameAsString()
+ " because compaction request was cancelled");
}
} return cr;
}
首先会根据Store的状态形成一个CompactionRequest对象(实现了Runnable接口,所谓的请求实际上就是一个Runnable对象,以线程的形式执行),然后根据当前Store需要进行Compact操作的StoreFile的总大小(不是文件数目)判断是何种类型的Compact(LargeCompaction或者SmallCompaction),从而将CompactionRequest提交至不同的线程池中执行。
LargeCompaction与SmallCompaction的选择过程如下:
boolean throttleCompaction(long compactionSize) {
long throttlePoint = conf.getLong(
"hbase.regionserver.thread.compaction.throttle", 2
* this.minFilesToCompact
* this.region.memstoreFlushSize); return compactionSize > throttlePoint;
}
至此,CompactRequest请求已被提交至相应的线程池中,根据线程池的执行策略,在适当的时机被执行。
CompactionRequest
如何根据Store的状态生成相应的CompactionRequest对象,即选择Store中的哪些StoreFile进行Compact,该过程是由Store requestCompaction方法完成的,下面对该方法的核心代码进行讲述。
代码主要是在一个同步代码块中完成的,
synchronized (filesCompacting) {
......
}
随着MemStore的不断Flush操作,可能会造成同一个Region的多次Compact Request,为了避免同一StoreFile出现在多个Compact中,所以需要对filesCompacting(表示已被选取进行Compact的StoreFile)进行互斥访问。
接下来的所有代码都是出现在上述同步代码块中的。
// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);
选取Store中的所有StoreFile作为备选对象,storefiles是依据StoreFile.Comparators.FLUSH_TIME进行排序的,此时candidates以及后续的filesCompacting也是有序的(older –> newer)。
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're
// currently
// compacting. this allows us to preserve contiguity
// (HBASE-2856)
StoreFile last = filesCompacting
.get(filesCompacting.size() - 1); int idx = candidates.indexOf(last); Preconditions.checkArgument(idx != -1); candidates.subList(0, idx + 1).clear();
}
如果filesCompacting不为空,则需要从candidates中移除一些StoreFile:
(1)选中filesCompacting中的最后一个StoreFile保存至变量last中,即filesCompacting中FlushTime最新的那个StoreFile;
(2)移除candidates中所有FlushTime比last旧的StoreFile。
这样做的目的主要是为了Compact操作的持续性,即每次尽可以多的合并新产生的StoreFile,不要重复去Compact那些已经被合并过或正在进行Compact的StoreFile。
boolean override = false; if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates, request);
} CompactSelection filesToCompact; if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
} else {
filesToCompact = compactSelection(candidates, priority);
} if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(
this,
ImmutableList.copyOf(filesToCompact
.getFilesToCompact()), request);
}
这里不讨论Coprocessor对Compact的影响,所以override为false,执行compactSelection算法(详细算法描述见后)进行选取,选取结果会被保存至filesToCompact中。
// no files to compact
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
}
经过相应算法选取之后,如果选取结果为空,则表示不需要进行Compact,返回null即可。
// basic sanity check: do not try to compact the same StoreFile
// twice.
if (!Collections.disjoint(filesCompacting,
filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after
// sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}
这是一个对选取结果进行核验的工作,主要是为了保证选取结果不会出现在filesCompacting中,即不会对同一个StoreFile进行两次Compact操作。
filesCompacting.addAll(filesToCompact.getFilesToCompact()); Collections.sort(filesCompacting,
StoreFile.Comparators.FLUSH_TIME);
将选取结果添加至filesCompacting中,并对更新后的filesCompacting依据FlushTime重新排序。
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles
.size()); if (isMajor) {
// since we're enqueuing a major, update the compaction wait
// interval
this.forceMajor = false;
}
如果选取结果包含该Store中所有的StoreFile,则表示我们应该进行一个Major Compaction,并取消Force Major Compaction。
// everything went better than expected. create a compaction
// request
int pri = getCompactPriority(priority);
计算本次Compact的优先级:
/**
* @return The priority that this store should have in the compaction queue
* @param priority
*/
public int getCompactPriority(int priority) {
// If this is a user-requested compaction, leave this at the highest
// priority
if (priority == PRIORITY_USER) {
return PRIORITY_USER;
} else {
return this.blockingStoreFileCount - this.storefiles.size();
}
}
如果Compact Request是由用户发起的,则需要给该Compaction赋于最高的优先级PRIORITY_USER(1),否则需要进行差值计算得出,其中blockingStoreFileCount来源于如下代码:
this.blockingStoreFileCount = conf.getInt(
"hbase.hstore.blockingStoreFiles", 7);
以下代码开始生成具体的CompactionRequest的对象,在我们讲述的流程中request为null,所以通过相应的构造函数创建对象即可。
// not a special compaction request, so we need to make one
if (request == null) {
request = new CompactionRequest(region, this,
filesToCompact, isMajor, pri);
} else {
// update the request with what the system thinks the
// request should be
// its up to the request if it wants to listen
request.setSelection(filesToCompact); request.setIsMajor(isMajor); request.setPriority(pri);
}
代码至此完成了一个CompactRequest的形成。
CompactSelection
该算法由方法compactSelection实现:
/**
* Algorithm to choose which files to compact
*
* Configuration knobs:
*
* "hbase.hstore.compaction.ratio" normal case: minor compact when file <=
* sum(smaller_files) * ratio
*
* "hbase.hstore.compaction.min.size" unconditionally compact individual
* files below this size
*
* "hbase.hstore.compaction.max.size" never compact individual files above
* this size (unless splitting)
*
* "hbase.hstore.compaction.min" min files needed to minor compact
*
* "hbase.hstore.compaction.max" max files to compact at once (avoids OOM)
*
* @param candidates
* candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates, int priority)
throws IOException {
......
}
方法开始处有如下注释:
// ASSUMPTION!!! filesCompacting is locked when calling this function /*
* normal skew:
*
* older ----> newer _ | | _ | | | | _ --|-|- |-|-
* |-|---_-------_------- minCompactSize | | | | | | | | _ | | | | | | |
* | | | | | | | | | | | | | | | | | | |
*/
该算法的选取主要从旧到新依次进行,minCompactSize 的应用见后面代码所示。
CompactSelection compactSelection = new CompactSelection(conf,
candidates);
创建一个CompactSelection对象,在构造方法中主要是初始化了一些参数。
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
根据forceMajor(在compactSelection方法调用之前已经计算出该值)与filesCompacting的状态计算forcemajor的值。如果forecemajor为false则执行下述代码:
if (!forcemajor) {
// Delete the expired store files before the compaction selection.
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
&& (ttl != Long.MAX_VALUE)
&& (this.scanInfo.minVersions == 0)) {
CompactSelection expiredSelection = compactSelection
.selectExpiredStoreFilesToCompact(EnvironmentEdgeManager
.currentTimeMillis() - this.ttl); // If there is any expired store files, delete them by
// compaction.
if (expiredSelection != null) {
return expiredSelection;
}
} // do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0; while (pos < compactSelection.getFilesToCompact().size()
&& compactSelection.getFilesToCompact().get(pos)
.getReader().length() > maxCompactSize
&& !compactSelection.getFilesToCompact().get(pos)
.isReference()) {
++pos;
} if (pos != 0) {
compactSelection.clearSubList(0, pos);
}
}
其中
// Delete the expired store files before the compaction selection.
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
&& (ttl != Long.MAX_VALUE)
&& (this.scanInfo.minVersions == 0)) {
CompactSelection expiredSelection = compactSelection
.selectExpiredStoreFilesToCompact(EnvironmentEdgeManager
.currentTimeMillis() - this.ttl); // If there is any expired store files, delete them by
// compaction.
if (expiredSelection != null) {
return expiredSelection;
}
}
这部分代码主要用于删除那些过期的StoreFile,如果存在有过期的StoreFile,则算法的选取结果即为这些过期的StoreFile,不再需要进行进一步的选取,在此先忽略这一步。
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0; while (pos < compactSelection.getFilesToCompact().size()
&& compactSelection.getFilesToCompact().get(pos)
.getReader().length() > maxCompactSize
&& !compactSelection.getFilesToCompact().get(pos)
.isReference()) {
++pos;
} if (pos != 0) {
compactSelection.clearSubList(0, pos);
}
compactSelection.getFilesToCompact()的返回值即为candidates,该代码的作用是从前到后淘汰一些文件大小超过配置大小的StoreFile,直至在此过程中遇到下述两种情况:
(1)某StoreFile的文件大小小于或等于maxCompactSize(hbase.hstore.compaction.max.size);
(2)某StoreFile的文件类型为Reference(该文件类型在Split的过程中产生)。
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + this
+ ": no store files to compact"); compactSelection.emptyFileList(); return compactSelection;
}
如果经过前一步的淘汰过程之后,compactSelection.getFilesToCompact()的返回结果为空,则表示没有相应的StoreFile可以进行Compact,选取结束,直接返回即可。
// Force a major compaction if this is a user-requested major
// compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction
boolean majorcompaction = (forcemajor && priority == PRIORITY_USER)
|| (forcemajor || isMajorCompaction(compactSelection
.getFilesToCompact()))
&& (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact);
majorcompaction为true,需要达到下述两个条件之一:
(1)用户主动请求进行Major Compaction;
(2)forcemajor为true或者满足系统主动进行Major Compaction的条件(主要根据StoreFile时间戳进行判断),而且此时待选取的StoreFile数目没有超过临界值(maxFilesToCompact:hbase.hstore.compaction.max)。
关于用户主动请求或系统主动进行Major Compaction的情况,后续再进行讨论。
接下来的代码会根据条件的不同出现两个大的代码块,分别进行讲述。
if (!majorcompaction
&& !hasReferences(compactSelection.getFilesToCompact())) {
......
} else {
......
}
(1)如果majorcompaction为false且剩余的待选取StoreFile中不包含Reference类型的文件,则执行如下代码:
// we're doing a minor compaction, let's see what files are
// applicable
int start = 0; double r = compactSelection.getCompactSelectionRatio();
r的值即为hbase.hstore.compaction.ratio,不考虑配置hbase.offpeak.start.hour、hbase.offpeak.end.hour情况。
// remove bulk import files that request to be excluded from minors
compactSelection.getFilesToCompact().removeAll(
Collections2.filter(compactSelection.getFilesToCompact(),
new Predicate<StoreFile>() {
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));
在待选取的StoreFile中移除那些已经被设置从Minor Compaction中移除的StoreFile。
// skip selection algorithm if we don't have enough files
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have "
+ compactSelection.getFilesToCompact().size()
+ " files ready for compaction. Need "
+ this.minFilesToCompact + " to initiate.");
} compactSelection.emptyFileList(); return compactSelection;
}
如果此时我们选取的StoreFile数目小于minFilesToCompact(配置时注意该值不包含等于的情况),则本次Compact的操作取消,直接返回即可,其中minFilesToCompact的计算代码如下:
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2, conf.getInt(
"hbase.hstore.compaction.min",
/* old name */conf
.getInt("hbase.hstore.compactionThreshold", 3)));
继续进行选取算法。
// get store file sizes for incremental compacting selection.
int countOfFiles = compactSelection.getFilesToCompact().size(); long[] fileSizes = new long[countOfFiles]; long[] sumSize = new long[countOfFiles]; for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = compactSelection.getFilesToCompact().get(i); fileSizes[i] = file.getReader().length(); // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for
// algo
int tooFar = i + this.maxFilesToCompact - 1; sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
上述代码实际是做一些准备工作,主要生成了两个重要的数组:fileSizes、sumSize。
fileSizes:依次保存着各个StoreFile的文件大小;
sumSize:对应fileSizes,依次保存着fileSizes[i,i+maxFilesToCompact-1)之和,即sumSize[i]中保存着compactSelection.getFilesToCompact()中第i个StoreFile(包含)之后最多maxFilesToCompact个StoreFile的总大小。
根据这两个数组,继续选举算法:
/*
* Start at the oldest file and stop when you find the first file
* that meets compaction criteria:
*
* (1) a recently-flushed, small file (i.e. <= minCompactSize) OR
* (2) within the compactRatio of sum(newer_files)
*
* Given normal skew, any newer files will also meet this criteria
*
* Additional Note: If fileSizes.size() >> maxFilesToCompact, we
* will recurse on compact(). Consider the oldest files first to
* avoid a situation where we always compact [end-threshold,end).
* Then, the last file becomes an aggregate of the previous
* compactions.
*/
while (countOfFiles - start >= this.minFilesToCompact
&& fileSizes[start] > Math.max(minCompactSize,
(long) (sumSize[start + 1] * r))) {
++start;
} int end = Math.min(countOfFiles, start + this.maxFilesToCompact); long totalSize = fileSizes[start]
+ ((start + 1 < countOfFiles) ? sumSize[start + 1] : 0); compactSelection = compactSelection.getSubList(start, end);
从前到后(根据FlushTime从旧到新)依次淘汰相应的StoreFile,直至遇到某一StoreFile的大小满足条件:
fileSizes[start] <= Math.max(minCompactSize, (long) (sumSize[start + 1] * r))
minCompactSize:hbase.hstore.compaction.min.size
理解其中的一种情况:fileSizes[start] > (long) (sumSize[start + 1] * r) > minCompactSize,它表示当前StoreFile的大小比它之后maxFilesToCompact范围内的所有StoreFile大小之和还要大,我们认为当前StoreFile太大,应淘汰掉,优先合并它之后那些较小的StoreFile。
根据注释可知,HBase希望最近Flush、Small的StoreFile文件大小满足下述两个条件之一:
(1)小于等于minCompactSize;
(2)小于等于the compactRatio of sum(newer_files)。
上述代码最后会更新compactSelection对象的状态。
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of " + this + ". Only "
+ (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
} compactSelection.emptyFileList(); return compactSelection;
}
如果此时没有足够的StoreFile进行Compact(StoreFile数目小于minFilesToCompact),则跳过这次Compact,直接返回。
(2)如果majorcompaction为true或者待选取的StoreFile中包含Reference文件,则执行如下代码:
if (majorcompaction) {
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
LOG.debug("Warning, compacting more than "
+ this.maxFilesToCompact
+ " files, probably because of a user-requested major compaction"); if (priority != PRIORITY_USER) {
LOG.error("Compacting more than max files on a non user-requested compaction");
}
}
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
// all files included in this compaction, up to max
int pastMax = compactSelection.getFilesToCompact().size()
- this.maxFilesToCompact; compactSelection.getFilesToCompact().subList(0, pastMax)
.clear();
}
如果majorcompaction为true,仅需要在选取进行Compact的StoreFile数目大于maxFilesToCompact(hbase.hstore.compaction.max)时,打印一些信息即可;
如果majorcompaction为false,且在选取进行Compact的StoreFile数目大于maxFilesToCompact(hbase.hstore.compaction.max)时,移除掉后面多余的StoreFile。
return compactSelection;
至此,选举算法结束。