HBase Compact

Region Compact请求是在Region MemStore Flush之后被触发的:

boolean shouldCompact = region.flushcache();

// We just want to check the size
boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestCompaction(region, getName());
} server.getMetrics().addFlush(region.getRecentFlushInfo());

region.flushcache方法负责该Region实例的Flush操作,该操作的返回值shouldCompact如果为true,则表示该Region达到了Region Compact的要求,但此时并不一定会触发Compact请求,因为shouldCompact之后会有shouldSplit的判断,只有当shouldSplit为false且shouldCompact为true时才会触发Compact请求。

注:某一Region的Flush、Split、Compact是分别在不同的线程中进行的,三者之间的协调会在后续详细介绍,在此仅仅关注Compact的触发条件,以及Compact进行时如何选取StoreFile进行合并即可。

Region Compact触发(即shouldCompact为true)的判断条件是该Region中各个Store中的StoreFile数目:

/**
* See if there's too much store files in this store
*
* @return true if number of store files is greater than the number defined
* in minFilesToCompact
*/
public boolean needsCompaction() {
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
}

在某一Region做Flush操作时,内部是逐个对该Region的多个Store依次进行Flush的,每一个Store Flush完成之后,就会调用上述代码进行判断:

storefiles:表示Store Flush之后,StoreFile的总数目;

filesCompacting:表示Store中正处于Compact状态的StoreFile的数目,即表示该Store中的某些StoreFile已被选择进行Compact,这次判断需要忽略这些StoreFile;

minFilesToCompact:表示进行Store Compact时Store中的StoreFile数目(不包含已被选择进行Compact的StoreFile)所需要达到的下限值,它的值是由以下代码计算而来的:

// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2, conf.getInt(
"hbase.hstore.compaction.min",
/* old name */conf
.getInt("hbase.hstore.compactionThreshold", 3)));

注意:hbase.hstore.compactionThreshold是一个旧的配置项,当配置项中出现hbase.hstore.compaction.min时,它将失效。

由上所示可知,shouldCompact为true的前提条件是当前Region中的某个Store中的StoreFile数目需要达到下限要求(需要去除那些已处于合并状态的StoreFile),如果满足条件,则通过requestCompaction发出具体的Compact请求,requestCompaction有多个重载方法,最终执行流程会被导向下述这些重载方法:

@Override
public synchronized List<CompactionRequest> requestCompaction(
final HRegion r, final String why, int pri,
final List<CompactionRequest> requests) throws IOException {
List<CompactionRequest> ret; // not a special compaction request, so make out own list
if (requests == null) {
ret = new ArrayList<CompactionRequest>(r.getStores().size()); for (Store s : r.getStores().values()) {
ret.add(requestCompaction(r, s, why, pri, null));
}
} else {
ret = new ArrayList<CompactionRequest>(requests.size()); for (CompactionRequest request : requests) {
ret.add(requestCompaction(r, request.getStore(), why, pri,
request));
}
} return ret;
}

可以看出,实际的Compact请求也是以Store为单位发出的,如下代码所示:

@Override
public synchronized CompactionRequest requestCompaction(final HRegion r,
final Store s, final String why, int priority,
CompactionRequest request) throws IOException {
if (this.server.isStopped()) {
return null;
} CompactionRequest cr = s.requestCompaction(priority, request); if (cr != null) {
cr.setServer(server); if (priority != Store.NO_PRIORITY) {
cr.setPriority(priority);
} ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) ? largeCompactions
: smallCompactions; pool.execute(cr); if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large "; LOG.debug(type
+ "Compaction requested: "
+ cr
+ (why != null && !why.isEmpty() ? "; Because: " + why
: "") + "; " + this);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + r.getRegionNameAsString()
+ " because compaction request was cancelled");
}
} return cr;
}

首先会根据Store的状态形成一个CompactionRequest对象(实现了Runnable接口,所谓的请求实际上就是一个Runnable对象,以线程的形式执行),然后根据当前Store需要进行Compact操作的StoreFile的总大小(不是文件数目)判断是何种类型的Compact(LargeCompaction或者SmallCompaction),从而将CompactionRequest提交至不同的线程池中执行。

LargeCompaction与SmallCompaction的选择过程如下:

boolean throttleCompaction(long compactionSize) {
long throttlePoint = conf.getLong(
"hbase.regionserver.thread.compaction.throttle", 2
* this.minFilesToCompact
* this.region.memstoreFlushSize); return compactionSize > throttlePoint;
}

至此,CompactRequest请求已被提交至相应的线程池中,根据线程池的执行策略,在适当的时机被执行。

CompactionRequest

如何根据Store的状态生成相应的CompactionRequest对象,即选择Store中的哪些StoreFile进行Compact,该过程是由Store requestCompaction方法完成的,下面对该方法的核心代码进行讲述。

代码主要是在一个同步代码块中完成的,

synchronized (filesCompacting) {
......
}

随着MemStore的不断Flush操作,可能会造成同一个Region的多次Compact Request,为了避免同一StoreFile出现在多个Compact中,所以需要对filesCompacting(表示已被选取进行Compact的StoreFile)进行互斥访问。

接下来的所有代码都是出现在上述同步代码块中的。

// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);

选取Store中的所有StoreFile作为备选对象,storefiles是依据StoreFile.Comparators.FLUSH_TIME进行排序的,此时candidates以及后续的filesCompacting也是有序的(older –> newer)。

if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're
// currently
// compacting. this allows us to preserve contiguity
// (HBASE-2856)
StoreFile last = filesCompacting
.get(filesCompacting.size() - 1); int idx = candidates.indexOf(last); Preconditions.checkArgument(idx != -1); candidates.subList(0, idx + 1).clear();
}

如果filesCompacting不为空,则需要从candidates中移除一些StoreFile:

(1)选中filesCompacting中的最后一个StoreFile保存至变量last中,即filesCompacting中FlushTime最新的那个StoreFile;

(2)移除candidates中所有FlushTime比last旧的StoreFile。

这样做的目的主要是为了Compact操作的持续性,即每次尽可以多的合并新产生的StoreFile,不要重复去Compact那些已经被合并过或正在进行Compact的StoreFile。

boolean override = false;

if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates, request);
} CompactSelection filesToCompact; if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
} else {
filesToCompact = compactSelection(candidates, priority);
} if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(
this,
ImmutableList.copyOf(filesToCompact
.getFilesToCompact()), request);
}

这里不讨论Coprocessor对Compact的影响,所以override为false,执行compactSelection算法(详细算法描述见后)进行选取,选取结果会被保存至filesToCompact中。

// no files to compact
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
}

经过相应算法选取之后,如果选取结果为空,则表示不需要进行Compact,返回null即可。

// basic sanity check: do not try to compact the same StoreFile
// twice.
if (!Collections.disjoint(filesCompacting,
filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after
// sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}

这是一个对选取结果进行核验的工作,主要是为了保证选取结果不会出现在filesCompacting中,即不会对同一个StoreFile进行两次Compact操作。

filesCompacting.addAll(filesToCompact.getFilesToCompact());

Collections.sort(filesCompacting,
StoreFile.Comparators.FLUSH_TIME);

将选取结果添加至filesCompacting中,并对更新后的filesCompacting依据FlushTime重新排序。

// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles
.size()); if (isMajor) {
// since we're enqueuing a major, update the compaction wait
// interval
this.forceMajor = false;
}

如果选取结果包含该Store中所有的StoreFile,则表示我们应该进行一个Major Compaction,并取消Force Major Compaction。

// everything went better than expected. create a compaction
// request
int pri = getCompactPriority(priority);

计算本次Compact的优先级:

/**
* @return The priority that this store should have in the compaction queue
* @param priority
*/
public int getCompactPriority(int priority) {
// If this is a user-requested compaction, leave this at the highest
// priority
if (priority == PRIORITY_USER) {
return PRIORITY_USER;
} else {
return this.blockingStoreFileCount - this.storefiles.size();
}
}

如果Compact Request是由用户发起的,则需要给该Compaction赋于最高的优先级PRIORITY_USER(1),否则需要进行差值计算得出,其中blockingStoreFileCount来源于如下代码:

this.blockingStoreFileCount = conf.getInt(
"hbase.hstore.blockingStoreFiles", 7);

以下代码开始生成具体的CompactionRequest的对象,在我们讲述的流程中request为null,所以通过相应的构造函数创建对象即可。

// not a special compaction request, so we need to make one
if (request == null) {
request = new CompactionRequest(region, this,
filesToCompact, isMajor, pri);
} else {
// update the request with what the system thinks the
// request should be
// its up to the request if it wants to listen
request.setSelection(filesToCompact); request.setIsMajor(isMajor); request.setPriority(pri);
}

代码至此完成了一个CompactRequest的形成。

 

CompactSelection

该算法由方法compactSelection实现:

/**
* Algorithm to choose which files to compact
*
* Configuration knobs:
*
* "hbase.hstore.compaction.ratio" normal case: minor compact when file <=
* sum(smaller_files) * ratio
*
* "hbase.hstore.compaction.min.size" unconditionally compact individual
* files below this size
*
* "hbase.hstore.compaction.max.size" never compact individual files above
* this size (unless splitting)
*
* "hbase.hstore.compaction.min" min files needed to minor compact
*
* "hbase.hstore.compaction.max" max files to compact at once (avoids OOM)
*
* @param candidates
* candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates, int priority)
throws IOException {
......
}

方法开始处有如下注释:

// ASSUMPTION!!! filesCompacting is locked when calling this function

/*
* normal skew:
*
* older ----> newer _ | | _ | | | | _ --|-|- |-|-
* |-|---_-------_------- minCompactSize | | | | | | | | _ | | | | | | |
* | | | | | | | | | | | | | | | | | | |
*/

该算法的选取主要从旧到新依次进行,minCompactSize 的应用见后面代码所示。

CompactSelection compactSelection = new CompactSelection(conf,
candidates);

创建一个CompactSelection对象,在构造方法中主要是初始化了一些参数。

boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();

根据forceMajor(在compactSelection方法调用之前已经计算出该值)与filesCompacting的状态计算forcemajor的值。如果forecemajor为false则执行下述代码:

if (!forcemajor) {
// Delete the expired store files before the compaction selection.
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
&& (ttl != Long.MAX_VALUE)
&& (this.scanInfo.minVersions == 0)) {
CompactSelection expiredSelection = compactSelection
.selectExpiredStoreFilesToCompact(EnvironmentEdgeManager
.currentTimeMillis() - this.ttl); // If there is any expired store files, delete them by
// compaction.
if (expiredSelection != null) {
return expiredSelection;
}
} // do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0; while (pos < compactSelection.getFilesToCompact().size()
&& compactSelection.getFilesToCompact().get(pos)
.getReader().length() > maxCompactSize
&& !compactSelection.getFilesToCompact().get(pos)
.isReference()) {
++pos;
} if (pos != 0) {
compactSelection.clearSubList(0, pos);
}
}

其中

// Delete the expired store files before the compaction selection.
if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
&& (ttl != Long.MAX_VALUE)
&& (this.scanInfo.minVersions == 0)) {
CompactSelection expiredSelection = compactSelection
.selectExpiredStoreFilesToCompact(EnvironmentEdgeManager
.currentTimeMillis() - this.ttl); // If there is any expired store files, delete them by
// compaction.
if (expiredSelection != null) {
return expiredSelection;
}
}

这部分代码主要用于删除那些过期的StoreFile,如果存在有过期的StoreFile,则算法的选取结果即为这些过期的StoreFile,不再需要进行进一步的选取,在此先忽略这一步。

// do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0; while (pos < compactSelection.getFilesToCompact().size()
&& compactSelection.getFilesToCompact().get(pos)
.getReader().length() > maxCompactSize
&& !compactSelection.getFilesToCompact().get(pos)
.isReference()) {
++pos;
} if (pos != 0) {
compactSelection.clearSubList(0, pos);
}

compactSelection.getFilesToCompact()的返回值即为candidates,该代码的作用是从前到后淘汰一些文件大小超过配置大小的StoreFile,直至在此过程中遇到下述两种情况:

(1)某StoreFile的文件大小小于或等于maxCompactSize(hbase.hstore.compaction.max.size);

(2)某StoreFile的文件类型为Reference(该文件类型在Split的过程中产生)。

if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + this
+ ": no store files to compact"); compactSelection.emptyFileList(); return compactSelection;
}

如果经过前一步的淘汰过程之后,compactSelection.getFilesToCompact()的返回结果为空,则表示没有相应的StoreFile可以进行Compact,选取结束,直接返回即可。

// Force a major compaction if this is a user-requested major
// compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction
boolean majorcompaction = (forcemajor && priority == PRIORITY_USER)
|| (forcemajor || isMajorCompaction(compactSelection
.getFilesToCompact()))
&& (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact);

majorcompaction为true,需要达到下述两个条件之一:

(1)用户主动请求进行Major Compaction;

(2)forcemajor为true或者满足系统主动进行Major Compaction的条件(主要根据StoreFile时间戳进行判断),而且此时待选取的StoreFile数目没有超过临界值(maxFilesToCompact:hbase.hstore.compaction.max)。

关于用户主动请求或系统主动进行Major Compaction的情况,后续再进行讨论。

接下来的代码会根据条件的不同出现两个大的代码块,分别进行讲述。

if (!majorcompaction
&& !hasReferences(compactSelection.getFilesToCompact())) {
......
} else {
......
}

(1)如果majorcompaction为false且剩余的待选取StoreFile中不包含Reference类型的文件,则执行如下代码:

// we're doing a minor compaction, let's see what files are
// applicable
int start = 0; double r = compactSelection.getCompactSelectionRatio();

r的值即为hbase.hstore.compaction.ratio,不考虑配置hbase.offpeak.start.hour、hbase.offpeak.end.hour情况。

// remove bulk import files that request to be excluded from minors
compactSelection.getFilesToCompact().removeAll(
Collections2.filter(compactSelection.getFilesToCompact(),
new Predicate<StoreFile>() {
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
}
}));

在待选取的StoreFile中移除那些已经被设置从Minor Compaction中移除的StoreFile。

// skip selection algorithm if we don't have enough files
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have "
+ compactSelection.getFilesToCompact().size()
+ " files ready for compaction. Need "
+ this.minFilesToCompact + " to initiate.");
} compactSelection.emptyFileList(); return compactSelection;
}

如果此时我们选取的StoreFile数目小于minFilesToCompact(配置时注意该值不包含等于的情况),则本次Compact的操作取消,直接返回即可,其中minFilesToCompact的计算代码如下:

// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2, conf.getInt(
"hbase.hstore.compaction.min",
/* old name */conf
.getInt("hbase.hstore.compactionThreshold", 3)));

继续进行选取算法。

// get store file sizes for incremental compacting selection.
int countOfFiles = compactSelection.getFilesToCompact().size(); long[] fileSizes = new long[countOfFiles]; long[] sumSize = new long[countOfFiles]; for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = compactSelection.getFilesToCompact().get(i); fileSizes[i] = file.getReader().length(); // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for
// algo
int tooFar = i + this.maxFilesToCompact - 1; sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}

上述代码实际是做一些准备工作,主要生成了两个重要的数组:fileSizes、sumSize。

fileSizes:依次保存着各个StoreFile的文件大小;

sumSize:对应fileSizes,依次保存着fileSizes[i,i+maxFilesToCompact-1)之和,即sumSize[i]中保存着compactSelection.getFilesToCompact()中第i个StoreFile(包含)之后最多maxFilesToCompact个StoreFile的总大小。

根据这两个数组,继续选举算法:

/*
* Start at the oldest file and stop when you find the first file
* that meets compaction criteria:
*
* (1) a recently-flushed, small file (i.e. <= minCompactSize) OR
* (2) within the compactRatio of sum(newer_files)
*
* Given normal skew, any newer files will also meet this criteria
*
* Additional Note: If fileSizes.size() >> maxFilesToCompact, we
* will recurse on compact(). Consider the oldest files first to
* avoid a situation where we always compact [end-threshold,end).
* Then, the last file becomes an aggregate of the previous
* compactions.
*/
while (countOfFiles - start >= this.minFilesToCompact
&& fileSizes[start] > Math.max(minCompactSize,
(long) (sumSize[start + 1] * r))) {
++start;
} int end = Math.min(countOfFiles, start + this.maxFilesToCompact); long totalSize = fileSizes[start]
+ ((start + 1 < countOfFiles) ? sumSize[start + 1] : 0); compactSelection = compactSelection.getSubList(start, end);

从前到后(根据FlushTime从旧到新)依次淘汰相应的StoreFile,直至遇到某一StoreFile的大小满足条件:

fileSizes[start] <= Math.max(minCompactSize, (long) (sumSize[start + 1] * r))

minCompactSize:hbase.hstore.compaction.min.size

理解其中的一种情况:fileSizes[start] > (long) (sumSize[start + 1] * r) > minCompactSize,它表示当前StoreFile的大小比它之后maxFilesToCompact范围内的所有StoreFile大小之和还要大,我们认为当前StoreFile太大,应淘汰掉,优先合并它之后那些较小的StoreFile。

根据注释可知,HBase希望最近Flush、Small的StoreFile文件大小满足下述两个条件之一:

(1)小于等于minCompactSize;

(2)小于等于the compactRatio of sum(newer_files)。

上述代码最后会更新compactSelection对象的状态。

// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of " + this + ". Only "
+ (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
} compactSelection.emptyFileList(); return compactSelection;
}

如果此时没有足够的StoreFile进行Compact(StoreFile数目小于minFilesToCompact),则跳过这次Compact,直接返回。

(2)如果majorcompaction为true或者待选取的StoreFile中包含Reference文件,则执行如下代码:

if (majorcompaction) {
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
LOG.debug("Warning, compacting more than "
+ this.maxFilesToCompact
+ " files, probably because of a user-requested major compaction"); if (priority != PRIORITY_USER) {
LOG.error("Compacting more than max files on a non user-requested compaction");
}
}
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
// all files included in this compaction, up to max
int pastMax = compactSelection.getFilesToCompact().size()
- this.maxFilesToCompact; compactSelection.getFilesToCompact().subList(0, pastMax)
.clear();
}

如果majorcompaction为true,仅需要在选取进行Compact的StoreFile数目大于maxFilesToCompact(hbase.hstore.compaction.max)时,打印一些信息即可;

如果majorcompaction为false,且在选取进行Compact的StoreFile数目大于maxFilesToCompact(hbase.hstore.compaction.max)时,移除掉后面多余的StoreFile。

return compactSelection;

至此,选举算法结束。

上一篇:Docker安装


下一篇:基于.NET CORE微服务框架 -谈谈surging API网关