HBase源码分析之HRegionServer上compact流程分析

        前面三篇文章中,我们详细叙述了compact流程是如何在HRegion上进行的,了解了它的很多细节方面的问题。但是,这个compact在HRegionServer上是如何进行的?合并时文件是如何选择的呢?在这篇文章中,你将找到答案!

        首先,在HRegionServer内部,我们发现,它定义了一个CompactSplitThread类型的成员变量compactSplitThread,单看字面意思,这就是一个合并分裂线程,那么它会不会就是HRegionServer上具体执行合并的工作线程呢?我们一步一步来看。

        要了解它是什么,能够做什么,那么就必须要看看它的实现,找到CompactSplitThread类,so,开始我们的分析之旅吧!

        首先,看下CompactSplitThread中都定义可哪些变量,如下:

// Configuration key for the large compaction threads.
  // large合并线程数参数hbase.regionserver.thread.compaction.large,默认值为1
  public final static String LARGE_COMPACTION_THREADS =
      "hbase.regionserver.thread.compaction.large";
  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
  
  // Configuration key for the small compaction threads.
  // small合并线程数参数hbase.regionserver.thread.compaction.small,默认值为1
  public final static String SMALL_COMPACTION_THREADS =
      "hbase.regionserver.thread.compaction.small";
  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
  
  // Configuration key for split threads
  // 分裂线程数参数hbase.regionserver.thread.split,默认值为1
  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
  public final static int SPLIT_THREADS_DEFAULT = 1;
  
  // Configuration keys for merge threads
  // merge合并线程数参数hbase.regionserver.thread.merge,默认值为1
  public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
  public final static int MERGE_THREADS_DEFAULT = 1;

  // Region分分裂的限制
  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
      "hbase.regionserver.regionSplitLimit";
  // Region分分裂的限制默认值,为1000
  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
  
  // HRegionServer实例server
  private final HRegionServer server;
  
  // Configuration配置实例conf
  private final Configuration conf;

  // long合并线程池longCompactions
  private final ThreadPoolExecutor longCompactions;
  // short合并线程池shortCompactions
  private final ThreadPoolExecutor shortCompactions;
  // 分裂线程池splits
  private final ThreadPoolExecutor splits;
  // merge合并线程池mergePool
  private final ThreadPoolExecutor mergePool;

  /**
   * Splitting should not take place if the total number of regions exceed this.
   * 如果region的总数超过这个限制,split就不应该发生。
   * This is not a hard limit to the number of regions but it is a guideline to
   * stop splitting after number of online regions is greater than this.
   * 这不是一个硬性的Region数目的限制,但是如果在线region的数目超过此限制它会是一个停止split的指南。
   */
  private int regionSplitLimit;
        其中,关于Region的Spilt、Merge相关的成员变量我们暂时忽略,等到专门讲解split、merge时再单独介绍。这里,先了解下CompactSplitThread中都有哪些关于compact的成员变量,大体可以分为三类:

        1、第一类是配置参数及其默认值相关的,涉及到large、small合并线程数参数和其默认值以及HBase整体配置变量Configuration类型的conf;

        2、第二类是线程池,包括long合并线程池longCompactions和short合并线程池shortCompactions,它们统一使用的Java中的ThreadPoolExecutor;

        3、第三类是CompactSplitThread的载体,或者说工作的环境,HRegionServer实例server。

        既然已经存在合并的线程池,那么很简单,将合并线程扔到线程池中等待调度就是了。那么是由哪些方法来完成的这一步呢?答案就在requestCompaction()及requestSystemCompaction()系列方法,而这一系列的requestCompaction()和requestSystemCompaction()方法参数不同,也仅意味着应用场景不同而已,最终还是要落到requestCompactionInternal()方法上的。同时,需要强调一点,requestCompaction()方法和requestSystemCompaction()方法有一个显著的区别,那就是在最终调用requestCompactionInternal()方法时,前者传入的selectNow为true,而后者传入的selectNow为false,这点需要特别注意下,下面也会讲到。先撇开都哪些地方会调用requestCompaction()系列方法,也就是compact发起的时机、条件等,我们后续会分析,这里我们先来看下requestCompactionInternal(),代码如下:

/**
   * @param r HRegion store belongs to
   * @param s Store to request compaction on
   * @param why Why compaction requested -- used in debug messages
   * @param priority override the default priority (NO_PRIORITY == decide)
   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
   *          compaction will be used.
   */
  private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
      final String why, int priority, CompactionRequest request, boolean selectNow)
          throws IOException {
    
	// 首选做一些必要的环境判断,比如HRegionServer是否已停止、HRegion对应的表是否允许Compact操作
	if (this.server.isStopped()
        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
      return null;
    }

    CompactionContext compaction = null;
    // 系统自动触发的system compaction,selectNow参数为false,如果是hbase shell等人为触发的合并,则selectNow为true
    if (selectNow) {
      // 通过hbase shell触发的major compaction,selectNow为true.这里进行实际的选取待合并文件操作
      compaction = selectCompaction(r, s, priority, request);
      if (compaction == null) return null; // message logged inside
    }

    // We assume that most compactions are small. So, put system compactions into small
    // pool; we will do selection there, and move to large pool if necessary.
    // 我们假设大部分合并都是small。所以,将系统引发的合并放进small pool,在那里我们会做出选择,如果有必要的话会挪至large pool
    
    // 也就是说,如果selectNow为false,即系统自身引发的合并,比如MemStore flush、compact检查线程等,统一放入到shortCompactions中,即small pool
    // 而如果是人为触发的,比如HBase shell,则还要看HStore中合并请求大小是否超过阈值,超过则放入longCompactions,即large pool,否则还是small pool
    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
      ? longCompactions : shortCompactions;
    
    // 将合并请求包装成CompactionRunner,扔进线程池去执行
    pool.execute(new CompactionRunner(s, r, compaction, pool));
    
    // 记录debug级别的LOG信息
    if (LOG.isDebugEnabled()) {
      String type = (pool == shortCompactions) ? "Small " : "Large ";
      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
    }
    
    // 返回,如果是人为触发的,返回合并请求,否则返回null
    return selectNow ? compaction.getRequest() : null;
  }
        直接说下大体流程吧!首先,需要做一些必要的检查,比如比如HRegionServer是否已停止、HRegion对应的表是否允许Compact操作等,然后这里有一个关键的地方,就是上述的selectNow,如果不是system compaction,selectNow为true,也就意味着它需要调用selectCompaction()方法,获取CompactionContext,而这本质上就是要选取待合并文件。我们先看下selectCompaction()方法,代码如下:

private CompactionContext selectCompaction(final HRegion r, final Store s,
      int priority, CompactionRequest request) throws IOException {
    
	// 调用HStore的requestCompaction()方法,获取CompactionContext
	CompactionContext compaction = s.requestCompaction(priority, request);
    if (compaction == null) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Not compacting " + r.getRegionNameAsString() +
            " because compaction request was cancelled");
      }
      return null;
    }
    
    // 确保CompactionContext中合并请求request不为空
    assert compaction.hasSelection();
    
    // 设置priority
    if (priority != Store.NO_PRIORITY) {
      compaction.getRequest().setPriority(priority);
    }
    return compaction;
  }

        而这个方法最终还是调用HStore的requestCompaction()方法来获取CompactionContext,继续分析:

  @Override
  public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
      throws IOException {
    
	// don't even select for compaction if writes are disabled
	// 如果对应HRegion不可写,直接返回null
    if (!this.areWritesEnabled()) {
      return null;
    }

    // Before we do compaction, try to get rid of unneeded files to simplify things.
    // 在我们做合并之前,试着摆脱不必要的文件来简化事情
    removeUnneededFiles();

    // 通过存储引擎storeEngine创建合并上下文CompactionContext
    CompactionContext compaction = storeEngine.createCompaction();
    CompactionRequest request = null;
    
    // 加读锁
    this.lock.readLock().lock();
    try {
      synchronized (filesCompacting) {
        // First, see if coprocessor would want to override selection.
    	// 如果存在协处理器
        if (this.getCoprocessorHost() != null) {
        	
          // 通过CompactionContext的preSelect()方法,选择StoreFile,返回StoreFilel列表
          List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
          
          // 如果存在协处理器,且其preCompactSelection()方法返回true,使用CompactionContext的forceSelect()方法,进行覆盖
          boolean override = this.getCoprocessorHost().preCompactSelection(
              this, candidatesForCoproc, baseRequest);
          if (override) {
            // Coprocessor is overriding normal file selection.
            compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
          }
        }

        // Normal case - coprocessor is not overriding file selection.
        if (!compaction.hasSelection()) {// 如果合并请求为空,即不存在协处理器
          
          // 是否为UserCompaction
          boolean isUserCompaction = priority == Store.PRIORITY_USER;
          boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
              offPeakCompactionTracker.compareAndSet(false, true);
          try {
        	// 调用CompactionContext的select()方法
            compaction.select(this.filesCompacting, isUserCompaction,
              mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
          } catch (IOException e) {
            if (mayUseOffPeak) {
              offPeakCompactionTracker.set(false);
            }
            throw e;
          }
          assert compaction.hasSelection();
          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
            // Compaction policy doesn't want to take advantage of off-peak.
            offPeakCompactionTracker.set(false);
          }
        }
        if (this.getCoprocessorHost() != null) {
          this.getCoprocessorHost().postCompactSelection(
              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
        }

        // Selected files; see if we have a compaction with some custom base request.
        if (baseRequest != null) {// 如果之前传入的请求不为空,则合并之
          // Update the request with what the system thinks the request should be;
          // its up to the request if it wants to listen.
          compaction.forceSelect(
              baseRequest.combineWith(compaction.getRequest()));
        }
        
        // Finally, we have the resulting files list. Check if we have any files at all.
        // 获取合并请求request
        request = compaction.getRequest();
        
        // 从合并请求request中获取待合并文件集合selectedFiles
        final Collection<StoreFile> selectedFiles = request.getFiles();
        if (selectedFiles.isEmpty()) {// 如果selectedFiles为空,直接返回null
          return null;
        }

        // 将选择的文件集合加入到filesCompacting中,解答了之前文章的疑问
        addToCompactingFiles(selectedFiles);

        // If we're enqueuing a major, clear the force flag.
        // 是否为major合并
        this.forceMajor = this.forceMajor && !request.isMajor();

        // Set common request properties.
        // Set priority, either override value supplied by caller or from store.
        // 设置优先级
        request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
        // 设置描述信息
        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
      }
    } finally {
      // 解除读锁
      this.lock.readLock().unlock();
    }

    LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
        + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
        + (request.isAllFiles() ? " (all files)" : ""));
    
    // 调用HRegion的reportCompactionRequestStart()方法,汇报一个compact请求开始
    this.region.reportCompactionRequestStart(request.isMajor());
    
    // 返回合并上下文compaction
    return compaction;
  }
        这里我们只叙述下主要过程,requestCompaction()方法的处理逻辑大体如下:

        1、如果对应HRegion不可写,直接返回null;

        2、在我们做合并之前,试着摆脱不必要的文件来简化事情;

        3、通过存储引擎storeEngine创建合并上下文CompactionContext类型的compaction;

        4、加读锁;

        5、如果存在协处理器:通过CompactionContext的preSelect()方法,选择StoreFile,返回StoreFilel列表;

        6、如果合并请求为空,即不存在协处理器:调用CompactionContext的select()方法,初始化compaction中的合并请求requst;

        7、如果之前传入的请求baseRequest不为空,则合并之;

        8、获取合并请求request;

        9、从合并请求request中获取待合并文件集合selectedFiles;

        10、将选择的文件集合加入到filesCompacting中,解答了之前文章的疑问;

        11、设置标志位forceMajor:是否为major合并;

        12、request中设置优先级、设置描述信息;

        13、解除读锁;

        14、调用HRegion的reportCompactionRequestStart()方法,汇报一个compact请求开始;

        15、返回合并上下文compaction。

        现在我们着重看下如何通过调用CompactionContext的select()方法初始化compaction中的合并请求requst,其他步骤比较简单,在此不一一叙述了。

        现在我们就看下其默认实现类DefaultCompactionContext中的select()方法,代码如下:

@Override
    public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
      
      // 利用合并策略compactionPolicy的selectCompaction()方法,获取合并请求request
      request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
          filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
      
      // 返回是否得到request的标志,true or false
      return request != null;
    }
        它是利用合并策略compactionPolicy的selectCompaction()方法,获取合并请求request。那么按照上面讲的,我看下合并策略的一种实现RatioBasedCompactionPolicy的selectCompaction()方法实现,代码如下:

/**
   * @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
   * @return subset copy of candidate list that meets compaction criteria
   * @throws java.io.IOException
   */
  public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
    
	// Preliminary compaction subject to filters
	// 初步压缩过滤器,即根据传入的参数candidateFiles,创建一个候选的StoreFile列表
    // candidateFiles为通过storeFileManager.getStorefiles()方法获取的Store下的全部存储文件
    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
    // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
    // able to compact more if stuck and compacting, because ratio policy excludes some
    // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
    
    // 确定futureFiles,如果filesCompacting为空则为0,否则为1
    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
        >= storeConfigInfo.getBlockingFileCount();
    
    // 从候选列表candidateSelection中排除正在合并的文件,即filesCompacting中的文件
    candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
    
    LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
        filesCompacting.size() + " compacting, " + candidateSelection.size() +
        " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");

    // If we can't have all files, we cannot do major anyway
    // 验证是否包含所有文件,设置标志位isAllFiles,判断的条件就是此时的候选列表candidateSelection大小是否等于初始的candidateFiles列表大小,
    // 而candidateFiles代表了Store下的全部文件
    boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
    
    // 如果没有包含所有文件,则不可能为一个Major合并
    if (!(forceMajor && isAllFiles)) {
      // 如果不是强制的Major合并,且不包含所有的文件,则调用skipLargeFiles()方法,跳过较大文件
      candidateSelection = skipLargeFiles(candidateSelection);
      
      // 再次确定标志位isAllFiles
      isAllFiles = candidateFiles.size() == candidateSelection.size();
    }

    // Try a major compaction if this is a user-requested major compaction,
    // or if we do not have too many files to compact and this was requested as a major compaction
    // 确定isTryingMajor,共三种情况:
    // 1、强制Major合并,且包含所有问文件,且是一个用户合并
    // 2、强制Major合并,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目
    boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
        || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection))
          && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
    // Or, if there are any references among the candidates.
    // candidates中存在引用的话,则视为是在分裂后的文件
    boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
    
    // 如果不是TryingMajor,且不是在分裂后
    if (!isTryingMajor && !isAfterSplit) {
      // We're are not compacting all files, let's see what files are applicable
      // 再次筛选文件
      candidateSelection = filterBulk(candidateSelection);// 取出不应该位于Minor合并的文件
      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
      candidateSelection = checkMinFilesCriteria(candidateSelection);
    }
    
    // candidateSelection中移除过量的文件
    candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
    
    // Now we have the final file list, so we can determine if we can do major/all files.
    // 查看是否为全部文件
    isAllFiles = (candidateFiles.size() == candidateSelection.size());
    
    // 利用candidateSelection构造合并请求CompactionRequest对象result
    CompactionRequest result = new CompactionRequest(candidateSelection);
    
    // 设置请求中的标志位
    result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
    result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
    
    // 返回合并请求CompactionRequest对象result
    return result;
  }
        我们捡重点的说,大体流程如下:

        1、根据传入的参数candidateFiles,创建一个候选的StoreFile列表;

               candidateFiles为通过storeFileManager.getStorefiles()方法获取的Store下的全部存储文件。

        2、确定futureFiles,如果filesCompacting为空则为0,否则为1;

        3、从候选列表candidateSelection中排除正在合并的文件,即filesCompacting中的文件;

        4、验证是否包含所有文件,设置标志位isAllFiles,判断的条件就是此时的候选列表candidateSelection大小是否等于初始的candidateFiles列表大小,而candidateFiles代表了Store下的全部文件;

        5、如果不是强制的Major合并,且不包含所有的文件,则调用skipLargeFiles()方法,跳过较大文件,并再次确定标志位isAllFiles;

        6、确定isTryingMajor,共两种情况:

            (1)强制Major合并,且包含所有问文件,且是一个用户合并;

            (2)强制Major合并,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目;

        7、candidates中存在引用的话,则视为是在分裂后的文件,即isAfterSplit为true;

        8、如果不是TryingMajor,且不是在分裂后isAfterSplit,再次筛选文件:

              8.1、通过filterBulk()方法取出不应该位于Minor合并的文件;

              8.2、通过applyCompactionPolicy()方法,使用一定的算法,进行文件的筛选;

              8.3、通过checkMinFilesCriteria()方法,判断是否满足合并时最小文件数的要求;

        9、通过removeExcessFiles()方法在candidateSelection中移除过量的文件;

        10、查看是否为全部文件:再次确定标志位isAllFiles;

        11、利用candidateSelection构造合并请求CompactionRequest对象result;

        12、设置请求中的标志位;

        13、返回合并请求CompactionRequest对象result。

        我们主要分析下其中文件筛选的一些方法。

        首先看跳过大文件的skipLargeFiles()方法,代码如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * exclude all files above maxCompactSize
   * Also save all references. We MUST compact them
   */
  private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
    int pos = 0;
    while (pos < candidates.size() && !candidates.get(pos).isReference()
      && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
      // 最主要的一个判断,列表指定位置的文件大小是否超过阈值comConf.getMaxCompactSize()
      // 这个阈值优先取参数hbase.hstore.compaction.max.size,参数未配置的话取Long.MAX_VALUE
      ++pos;
    }
    if (pos > 0) {
      LOG.debug("Some files are too large. Excluding " + pos
          + " files from compaction candidates");
      // 由此可见candidates应该是一个以文件大小倒序排序的列表
      candidates.subList(0, pos).clear();
    }
    return candidates;
  }
        它会遍历文件列表candidates,最主要的一个判断,列表指定位置的文件大小是否超过阈值comConf.getMaxCompactSize(),这个阈值优先取参数hbase.hstore.compaction.max.size,参数未配置的话取Long.MAX_VALUE。

        其次再看下取出不应该位于Minor合并的文件的filterBulk()方法,代码如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * exclude all bulk load files if configured
   */
  private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
    
	// 去除掉不应该在Minor合并中的文件:根据 StoreFile的标志位excludeFromMinorCompaction判断
	// 它的判断是HFile信息的元数据中存在EXCLUDE_FROM_MINOR_COMPACTION标志
    candidates.removeAll(Collections2.filter(candidates,
        new Predicate<StoreFile>() {
          @Override
          public boolean apply(StoreFile input) {
            return input.excludeFromMinorCompaction();
          }
        }));
    return candidates;
  }
        它根据StoreFile的标志位excludeFromMinorCompaction判断,而excludeFromMinorCompaction为true是当HFile信息的元数据中存在EXCLUDE_FROM_MINOR_COMPACTION标志时设置的,说了这么多,其实它就是要排除BulkLoad进入HBase的文件!

        然后,我们再看下比较复杂的applyCompactionPolicy()方法,代码如下:

/**
    * @param candidates pre-filtrate
    * @return filtered subset
    * -- Default minor compaction selection algorithm:
    * choose CompactSelection from candidates --
    * First exclude bulk-load files if indicated in configuration.
    * Start at the oldest file and stop when you find the first file that
    * meets compaction criteria:
    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
    * OR
    * (2) within the compactRatio of sum(newer_files)
    * Given normal skew, any newer files will also meet this criteria
    * <p/>
    * Additional Note:
    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
    * compact().  Consider the oldest files first to avoid a
    * situation where we always compact [end-threshold,end).  Then, the
    * last file becomes an aggregate of the previous compactions.
    *
    * normal skew:
    *
    *         older ----> newer (increasing seqID)
    *     _
    *    | |   _
    *    | |  | |   _
    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
    *    | |  | |  | |  | |  _  | |
    *    | |  | |  | |  | | | | | |
    *    | |  | |  | |  | | | | | |
    */
  ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
    
	// 如果文件列表为空,原样返回  
	if (candidates.isEmpty()) {
      return candidates;
    }

    // we're doing a minor compaction, let's see what files are applicable
    int start = 0;
    
    // 获取文件合并比例:取参数hbase.hstore.compaction.ratio,默认为1.2
    double ratio = comConf.getCompactionRatio();
    if (mayUseOffPeak) {// 如果不是在峰值使用
      // 取参数hbase.hstore.compaction.ratio.offpeak,默认为5.0
      ratio = comConf.getCompactionRatioOffPeak();
      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
    }

    // get store file sizes for incremental compacting selection.
    
    // 待合并文件数目countOfFiles
    final int countOfFiles = candidates.size();
    
    // 用于存放文件大小的数组fileSizes
    long[] fileSizes = new long[countOfFiles];
    
    // 用于存放该文件之后在最大文件数这个范围内所有文件(包含该文件)大小合计的数组sumSize
    long[] sumSize = new long[countOfFiles];
    
    // 倒序遍历candidates文件泪飙
    for (int i = countOfFiles - 1; i >= 0; --i) {
      StoreFile file = candidates.get(i);
      
      // 将文件大小放入数组fileSizes
      fileSizes[i] = file.getReader().length();
      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
      // tooFar表示后移动最大文件数位置的文件大小,其实也就是刚刚满足达到最大文件数位置的那个文件,也就是说,从i至tooFar数目为合并时允许的最大文件数
      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
      
      // 计算合计:该文件大小fileSizes[i] + (截止到下一个文件大小sumSize[i + 1]) - 后移动最大文件数位置的文件大小
      // 也就是说sumSize[i]的值,涉及到的文件数目,永远是满足合并时允许的最大文件数这个阈值的,它相当于一个滑动的区间,区间大小为合并时允许的最大文件数
      sumSize[i] = fileSizes[i]
        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
    }

    // 倒序循环,如果文件数目满足最小合并时允许的最小文件数,且该位置的文件大小,
    // 大于合并时允许的文件最小大小与下一个文件窗口文件总大小乘以一定比例中的较大者,则继续,
    // 实际上就是选择出一个文件窗口内能最小能满足的文件大小的一组文件
    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
          (long) (sumSize[start + 1] * ratio))) {
      ++start;
    }
    if (start < countOfFiles) {
      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
        + " files from " + countOfFiles + " candidates");
    } else if (mayBeStuck) {
      // We may be stuck. Compact the latest files if we can.
      // 保证最小文件数目的要求
      int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
      if (filesToLeave >= 0) {
        start = filesToLeave;
      }
    }
    
    // 截取
    candidates.subList(0, start).clear();
    return candidates;
  }
        这个applyCompactionPolicy()方法是RatioBasedCompactionPolicy合并策略的精髓,我们需要细细分析,它的主要步骤为:

        1、如果文件列表为空,原样返回;

        2、获取文件合并比例:取参数hbase.hstore.compaction.ratio,默认为1.2,如果可以在峰值使用,取参数hbase.hstore.compaction.ratio.offpeak,默认为5.0,也就是说将参数调整大些;

        3、计算待合并文件数目countOfFiles;

        4、定义用于存放文件大小的数组fileSizes;

        5、定义用于存放该文件之后在最大文件数这个范围内所有文件(包含该文件)大小合计的数组sumSize;

        6、倒序遍历candidates文件列表:

              6.1、将文件大小放入数组fileSizes指定位置;

              6.2、tooFar表示后移动最大文件数位置的文件大小,其实也就是从i开始刚刚满足达到最大文件数位置的那个文件,也就是说,从i至tooFar数目为合并时允许的最大文件数,它类似于一个平滑的文件窗口;

              6.3、计算合计:该文件大小fileSizes[i] + (截止到下一个文件大小sumSize[i + 1]) - 后移动最大文件数位置的文件大小,也就是说sumSize[i]对应的被统计文件,永远是满足合并时允许的最大文件数这个阈值的,它相当于一个滑动的区间,区间大小为合并时允许的最大文件数,sumSize[i]对应的值为已该i开始所处文件窗口的所有文件大小合计。

        7、正序循环,如果文件数目满足最小合并时允许的最小文件数,且该位置的文件大小,大于合并时允许的文件最小大小与下一个文件窗口文件总大小乘以一定比例中的较大者,则继续,实际上就是选择出一个文件窗口内能兼顾最小文件数和最小文件大小的一组文件;

        8、保证最小文件数目的要求,必要时进行截取;

        9、截取并返回截取后的文件列表。

        上面的一个中心思想就是选出满足条件的最小的一组文件来合并。

        紧接着,我们看下检测是否满足最小文件数大的checkMinFilesCriteria()方法,代码如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * forget the compactionSelection if we don't have enough files
   */
  private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
    int minFiles = comConf.getMinFilesToCompact();
    if (candidates.size() < minFiles) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Not compacting files because we only have " + candidates.size() +
          " files ready for compaction. Need " + minFiles + " to initiate.");
      }
      candidates.clear();
    }
    return candidates;
  }
        很直接有木有,不满足合并时最小文件数要求,直接clear,太奔放了!

        最后,我们看下如何移除过量的文件,即removeExcessFiles()方法,代码如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * take upto maxFilesToCompact from the start
   */
  private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
      boolean isUserCompaction, boolean isMajorCompaction) {
    // 是否过量:文件列表大小减去满足合并的最大文件数
	int excess = candidates.size() - comConf.getMaxFilesToCompact();
    if (excess > 0) {
      if (isMajorCompaction && isUserCompaction) {
        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
            " files because of a user-requested major compaction");
      } else {
        LOG.debug("Too many admissible files. Excluding " + excess
          + " files from compaction candidates");
        candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
      }
    }
    return candidates;
  }

        它是要求待合并文件数不能超过系统设置的合并时最大文件数。

        至此,合并请求的生成和文件的选择就到此为止了。

        接下来再回到CompactSplitThread的requestCompactionInternal()方法,看下它对线程池是如何处理的。这里,它首先假设大部分合并都是small,所以它将系统引发的合并放进small pool,然后在特定的时机再做决断,如果有必要的话会挪至large pool。也就是说,如果selectNow为false,即系统自身引发的合并,比如MemStore flush、compact检查线程等,统一放入到shortCompactions中,即small pool;而如果是人为触发的,即selectNow为true,比如HBase shell触发的,则还要看HStore中合并请求大小是否超过阈值,超过则放入longCompactions,即large pool,否则还是small pool。

        那么这个HStore中合并请求大小是否超过阈值是如何计算的呢?我们跟踪下HStore的throttleCompaction()方法,代码如下:

@Override
  public boolean throttleCompaction(long compactionSize) {
    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
  }
        它实际上是调用的合并策略CompactionPolicy的throttleCompaction()方法。那么,都有哪几种合并策略呢?总结起来,一共有两种:RatioBasedCompactionPolicy和StripeCompactionPolicy。现在我们以RatioBasedCompactionPolicy为例来讲,另一种StripeCompactionPolicy以后再分析。看下它的throttleCompaction()方法:

/**
   * @param compactionSize Total size of some compaction
   * @return whether this should be a large or small compaction
   */
  @Override
  public boolean throttleCompaction(long compactionSize) {
    return compactionSize > comConf.getThrottlePoint();
  }
        它是将传入的compactionSize与comConf.getThrottlePoint()来比较的,传入的compactionSize实际上为上面提到的compaction.getRequest().getSize(),也就是合并请求的大小totalSize,这个totalSize是通过CompactionRequest的recalculateSize()方法计算得到的,代码如下:

/**
   * Recalculate the size of the compaction based on current files.
   * @param files files that should be included in the compaction
   */
  private void recalculateSize() {
    long sz = 0;
    for (StoreFile sf : this.filesToCompact) {
      Reader r = sf.getReader();
      sz += r == null ? 0 : r.length();
    }
    this.totalSize = sz;
  }
        它遍历待合并文件StoreFile,获取其Reader,通过它获得文件长度并累加至totalSize。

        而comConf是其父类CompactionPolicy中关于compact配置的CompactionConfiguration类型成员变量,其getThrottlePoint()方法如下:

/**
   * @return ThrottlePoint used for classifying small and large compactions
   */
  public long getThrottlePoint() {
    return throttlePoint;
  }
        实际上取得是CompactionConfiguration的成员变量throttlePoint,而throttlePoint在其构造方法中定义如下:

throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
        优先取参数hbase.regionserver.thread.compaction.throttle,如果参数未配置,默认为最大合并文件数maxFilesToCompact与MemStore flush大小的两倍,而这个maxFilesToCompact的取值如下:

maxFilesToCompact = conf.getInt(HBASE_HSTORE_COMPACTION_MAX_KEY, 10);
        也就是取参数hbase.hstore.compaction.max,参数未配置的话默认为10。那么MemStore flush大小是如何获取的呢?它实际上是通过StoreConfigInformation接口的getMemstoreFlushSize()方法获取的,而需要使用的最终实现该方法的类,还是HStore,代码如下:

@Override
  public long getMemstoreFlushSize() {
    // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
    return this.region.memstoreFlushSize;
  }
        各位看官可能有疑问了,既然compact是以Store为单位进行的,为什么这里获取的是region的memstoreFlushSize呢?我们知道,HBase并不是一个纯粹意义上的列式数据库,它的MemStore flush的发起,并不是以Store为单位进行的,而是整个Region,这也是HBase一开始饱受诟病的列簇Column Family不能过多的原因。那么,这里的memstoreFlushSize就可以很容易理解为什么要获取Region的了。

        这个memstoreFlushSize我们之前介绍过,这里再回顾下,memstoreFlushSize为HRegion上设定的一个阈值,当MemStore的大小超过这个阈值时,将会发起flush请求,它的计算首先是由Table决定的,即每个表可以设定自己的memstoreFlushSize,通过关键字MEMSTORE_FLUSHSIZE来设定,如果MEMSTORE_FLUSHSIZE未设定,则取参数hbase.hregion.memstore.flush.size,参数未配置的话,则默认为1024*1024*128L,即128M。

        用俺们山东人的话来说,落落了这么多,到底是什么意思呢?很简单,它就是看合并请求中涉及的数据量大小是否超过一个阈值,超过则放入large pool,未超过则放入small pool。这个阈值可以通过参数直接配置,不配置的话,则是最大可合并文件数与引起MemStore的flush的阈值memstoreFlushSize的两倍,这个memstore flush到文件中,是不是就是文件的总大小呢?文件数乘以文件大小,是不是逻辑上近似于待合并数据的大小呢?大体就是这么个意思。

        好了,“数据”的目的地--线程池选好了,接下来就是该把“数据”放入线程池了。既然是线程池,那么这个“数据”就应该是一个线程,我们继续看。

// 将合并请求包装成CompactionRunner,扔进线程池去执行
    pool.execute(new CompactionRunner(s, r, compaction, pool));

        这一句体现的再明白不过了,将HStore、HRegion、合并上下文CompactionContext、线程池ThreadPoolExecutor包装成一个CompactionRunner对象,扔入线程池中执行。而CompactionRunner给我们的第一印象就是,它必定是一个可执行的线程。那么我们就看下它的代码吧:

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
      justification="Contrived use of compareTo")
  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
	
	// 类的定义中直接体现了,实现了Runnable接口意味着它是一个线程。
	
    private final Store store;
    private final HRegion region;
    private CompactionContext compaction;
    private int queuedPriority;
    private ThreadPoolExecutor parent;

    public CompactionRunner(Store store, HRegion region,
        CompactionContext compaction, ThreadPoolExecutor parent) {
      super();
      this.store = store;
      this.region = region;
      this.compaction = compaction;
      
      // 合并排队的优先级,如果合并上下文compaction为空,则通过HStore的getCompactPriority()方法获取,否则直接从合并请求中获取,
      // 而合并请求中的,实际上也是通过调用requestCompactionInternal()方法的priority传入的
      this.queuedPriority = (this.compaction == null)
          ? store.getCompactPriority() : compaction.getRequest().getPriority();
      this.parent = parent;
    }
  }
        先看类的定义,类的定义中直接体现了,实现了Runnable接口意味着它是一个线程。而它除了构造函数传入的那四个成员变量外,还有个表示优先级的成员变量queuedPriority,它的初始化是在构造方法中完成的。如果合并上下文compaction为空,则通过HStore的getCompactPriority()方法获取,否则直接从合并请求中获取,而合并请求中的,实际上也是通过调用requestCompactionInternal()方法的priority传入的。我们接下来看下HStore的getCompactPriority()方法:

@Override
  public int getCompactPriority() {
	  
	// 从StoreFileManager中获取Compact Priority
    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
    if (priority == PRIORITY_USER) {
      LOG.warn("Compaction priority is USER despite there being no user compaction");
    }
    return priority;
  }
        它转而从StoreFileManager中获取Compact Priority,继续吧!在StoreFileManager的默认实现DefaultStoreFileManager中,代码如下:

  @Override
  public int getStoreCompactionPriority() {
	
	// blockingFileCount优先取参数hbase.hstore.blockingStoreFiles,未配置的话再默认为7
	// 还记得isTooManyStoreFiles这个方法吗?MemStore在进行flush时会判断HRegion上每个HStore下的文件数是否太多,
    // 太多则意味着MemStore的flush会被推迟进行,优先进行compact,否则文件数则会越来越多,而这里,离blockingFileCount越远,当前文件数越小
	// 的话,则意味着MemStore的flush可以优先进行,而compact可以在它flush之后再进行,将资源利用效率最大化
    int blockingFileCount = conf.getInt(
        HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
    
    // 优先级为上述blockingFileCount减去当前storefiles的数目
    int priority = blockingFileCount - storefiles.size();
    
    // 如果priority为1,则返回2,否则返回原值
    return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
  }
        优先级为上述blockingFileCount减去当前storefiles的数目。而blockingFileCount优先取参数hbase.hstore.blockingStoreFiles,未配置的话再默认为7。还记得isTooManyStoreFiles这个方法吗?MemStore在进行flush时会判断HRegion上每个HStore下的文件数是否太多,太多则意味着MemStore的flush会被推迟进行,优先进行compact,否则文件数则会越来越多,而这里,离blockingFileCount越远,当前文件数越小的话,则意味着MemStore的flush可以优先进行,而compact可以在它flush之后再进行,将资源利用效率最大化。

        接下来,我们在看下CompactionRunner中最重要的run()方法,代码如下:

@Override
    public void run() {
      Preconditions.checkNotNull(server);
      
      // 首选做一些必要的环境判断,比如HRegionServer是否已停止、HRegion对应的表是否允许Compact操作
      if (server.isStopped()
          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
        return;
      }
      // Common case - system compaction without a file selection. Select now.
      // 常见的,系统合并还没有选择待合并的文件。现在选择下。
      if (this.compaction == null) {
        
    	// 之前的Compact优先级赋值给oldPriority
    	int oldPriority = this.queuedPriority;
        
        // 获取HStore的Compact优先级
        this.queuedPriority = this.store.getCompactPriority();
        
        // 如果当前优先级queuedPriority大于之前的oldPriority
        if (this.queuedPriority > oldPriority) {
          // Store priority decreased while we were in queue (due to some other compaction?),
          // requeue with new priority to avoid blocking potential higher priorities.
          
          // 将该CompactionRunner在扔回线程池
          this.parent.execute(this);
          return;
        }
        
        // 选择storefile
        try {
          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
        } catch (IOException ex) {
          LOG.error("Compaction selection failed " + this, ex);
          server.checkFileSystem();
          return;
        }
        if (this.compaction == null) return; // nothing to do
        // Now see if we are in correct pool for the size; if not, go to the correct one.
        // We might end up waiting for a while, so cancel the selection.
        
        // 确保合并请求存在
        assert this.compaction.hasSelection();
        
        // 再次判断下是应该在large池中执行还是应该在small池中执行,此次只根据上述的那个阈值来判断
        ThreadPoolExecutor pool = store.throttleCompaction(
            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
        
        if (this.parent != pool) {// 换池了
          // HStore取消合并请求
          this.store.cancelRequestedCompaction(this.compaction);
          
          // 复位compaction为null
          this.compaction = null;
          
          // 换池
          this.parent = pool;
          
          // 放入线程池,后续会再初始化compaction
          this.parent.execute(this);
          return;
        }
      }
      
      // Finally we can compact something.
      // 确保compaction不为空
      assert this.compaction != null;

      // 执行之前
      this.compaction.getRequest().beforeExecute();
      try {
        // Note: please don't put single-compaction logic here;
        //       put it into region/store/etc. This is CST logic.
        
    	// 执行开始时间
    	long start = EnvironmentEdgeManager.currentTime();
        
    	// 调用HRegion的compact,针对store执行compact
    	boolean completed = region.compact(compaction, store);
        
    	// 计算执行时间
    	long now = EnvironmentEdgeManager.currentTime();
        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
        
        // 根据合并结果确定下一步操作
        if (completed) {// 如果合并成功
          // degenerate case: blocked regions require recursive enqueues
          if (store.getCompactPriority() <= 0) {
        	// 如果优先级Priority小于等于0,意味着当前文件已经太多,则需要发起一次SystemCompaction
            requestSystemCompaction(region, store, "Recursive enqueue");
          } else {
            // see if the compaction has caused us to exceed max region size
        	// 请求分裂,实际上是看Region的大小是否超过阈值,从而引起分裂
            requestSplit(region);
          }
        }
      } catch (IOException ex) {
        IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
        LOG.error("Compaction failed " + this, remoteEx);
        if (remoteEx != ex) {
          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
        }
        server.checkFileSystem();
      } catch (Exception ex) {
        LOG.error("Compaction failed " + this, ex);
        server.checkFileSystem();
      } finally {
        LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
      }
      
      // 请求合并之后的处理
      this.compaction.getRequest().afterExecute();
    }
        run()方法以上来,也是会首选做一些必要的环境判断,比如HRegionServer是否已停止、HRegion对应的表是否允许Compact操作等。

        然后,针对compaction为null的情况,进行compaction的初始化,即待合并文件的选择。在这个过程之前,会先判断下优先级,之前的Compact优先级赋值给oldPriority,获取HStore的Compact优先级,如果当前优先级queuedPriority大于之前的oldPriority的话,即HStore下文件数目减少了,则会推迟compact,可以优先进行flush,将该CompactionRunner再扔回线程池。如果优先级满足条件,则继续,通过selectCompaction()选择待合并文件,并再次判断下是应该在large池中执行还是应该在small池中执行,此次只根据上述的那个阈值来判断。

        接下来,如果换池了,HStore调用cancelRequestedCompaction()方法取消合并请求,复位compaction为null,换池,并再次放入线程池,后续会再初始化compaction,然后就return。

        如果没换池的话,确保compaction不为空,调用HRegion的compact,针对store执行compact,计算执行时间,并获得compact的执行结果,根据合并结果确定下一步操作。

        如果合并成功,如果优先级Priority小于等于0,意味着当前文件已经太多,则需要发起一次SystemCompaction,否则请求分裂,实际上是看Region的大小是否超过阈值,从而引起分裂。

        整个CompactSplitThread的工作流程已描述完毕。那么接下来的问题,就是何时什么情况下会发起compact请求?发起的compact请求又有如何不同呢?是否会有定期检查的工作线程,促使compact在满足一定条件的情况下进行呢?

        且听下回分解。

        

        










上一篇:HBase源码分析之MemStore的flush发起时机、判断条件等详情


下一篇:主流开源分布式图数据库 Benchmark