在《HBase源码分析之HRegion上compact流程分析(二)》一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法。现在我们来分析下它的具体实现。
首先,CompactionContext表示合并的上下文信息,它只是一个抽象类,其compact()并没有实现,代码如下:
/** * Runs the compaction based on current selection. select/forceSelect must have been called. * @return The new file paths resulting from compaction. */ public abstract List<Path> compact() throws IOException;那么,我们来找下它的实现类。它一共有两种实现类:DefaultCompactionContext和StripeCompaction,今天我们以DefaultCompactionContext为例来讲解。
首先看下DefaultCompactionContext中compact()方法的实现:
@Override public List<Path> compact() throws IOException { return compactor.compact(request); }这个compactor可以根据参数hbase.hstore.defaultengine.compactor.class配置,但是默认实现为DefaultCompactor。那么,接下来,我们看下它的实现:
/** * Do a minor/major compaction on an explicit set of storefiles from a Store. * 在一个Store中明确的storefiles集合中执行一个minor或者major合并 */ public List<Path> compact(final CompactionRequest request) throws IOException { // 从请求中获取文件详情fd,fd是FileDetails类型 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); // 构造合并过程追踪器CompactionProgress this.progress = new CompactionProgress(fd.maxKeyCount); // Find the smallest read point across all the Scanners. // 找到scanners中的最小的可读点,实际上就是找到最小能够读取数据的点 long smallestReadPoint = getSmallestReadPoint(); List<StoreFileScanner> scanners; Collection<StoreFile> readersToClose; // 根据参数hbase.regionserver.compaction.private.readers确定是否使用私有readers if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) { // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, // HFileFiles, and their readers // 克隆所有的StoreFiles,以便我们将在StoreFiles、HFileFiles以及它们的readers等一个独立的副本上执行合并 // 根据请求中待合并文件的数目创建一个StoreFile列表:readersToClose readersToClose = new ArrayList<StoreFile>(request.getFiles().size()); // 将待合并文件复制一份加入readersToClose列表 for (StoreFile f : request.getFiles()) { readersToClose.add(new StoreFile(f)); } // 根据readersToClose列表,即待合并文件的副本创建文件浏览器FileScanners scanners = createFileScanners(readersToClose, smallestReadPoint); } else { // 创建空的列表readersToClose readersToClose = Collections.emptyList(); // 根据实际请求中的待合并文件列表创建文件浏览器FileScanners scanners = createFileScanners(request.getFiles(), smallestReadPoint); } StoreFile.Writer writer = null; List<Path> newFiles = new ArrayList<Path>(); boolean cleanSeqId = false; IOException e = null; try { InternalScanner scanner = null; try { /* Include deletes, unless we are doing a compaction of all files */ // 确定scan类型scanType: // 如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES; // 如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES。 ScanType scanType = request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; // 如果有协处理器,调用协处理器的preCreateCoprocScanner()方法 scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); if (scanner == null) { // 如果协处理器中未创建scanner,调用createScanner()方法创建一个 scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); } // 如果有协处理器,调用协处理器的preCompact()方法 scanner = postCreateCoprocScanner(request, scanType, scanner); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return newFiles; } // Create the writer even if no kv(Empty store file is also ok), // because we need record the max seq id for the store file, see HBASE-6059 // 确定最小读取点smallestReadPoint if(fd.minSeqIdToKeep > 0) { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. // 调用HStore的createWriterInTmp()方法,获取writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0); // 调用performCompaction()方法,执行合并 boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); // 如果没有完成合并 if (!finished) { // 关闭writer writer.close(); // 删除writer中的临时文件 store.getFileSystem().delete(writer.getPath(), false); writer = null; // 抛出异常 throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); } } finally { // 关闭scanner if (scanner != null) { scanner.close(); } } } catch (IOException ioe) { e = ioe; // Throw the exception throw ioe; } finally { try { if (writer != null) { if (e != null) { // 无异常的话,关闭writer writer.close(); } else { // 存在异常的话,写入元数据,关闭writer,并将写入地址加入newFiles writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); writer.close(); newFiles.add(writer.getPath()); } } } finally { // 依次关闭readersToClose中StoreFile的Reader for (StoreFile f : readersToClose) { try { f.closeReader(true); } catch (IOException ioe) { LOG.warn("Exception closing " + f, ioe); } } } } // 返回newFiles return newFiles; }总结下DefaultCompactor的compact()方法的处理流程,大体有如下几点:
1、通过父类Compactor的getFileDetails()方法从请求中获取文件详情fd,fd是FileDetails类型,这个FileDetails类型的文件详情中主要包含如下信息:
(1)合并之后总的keyvalue数目:maxKeyCount;
(2)如果是major合并,最早的Put时间戳earliestPutTs;
(3)合并时文件中最大的序列号maxSeqId;
(4)相关文件中最新的MemStore数据读取点maxMVCCReadpoint;
(5)最大的tag长度maxTagsLength;
(6)在major合并期间需要保持的最小序列号minSeqIdToKeep。
2、构造合并过程追踪器CompactionProgress,用于追踪合并过程;
3、通过父类Compactor的getSmallestReadPoint()方法找到所有scanners中的最小的可读点,实际上就是找到最小能够读取数据的点smallestReadPoint;
4、根据参数hbase.regionserver.compaction.private.readers确定是否使用私有readers,默认为false不使用:
4.1、如果需要使用,即参数配置为true的话,克隆所有的StoreFiles,以便我们将在StoreFiles、HFileFiles以及它们的readers等一个独立的副本上执行合并;
4.1.1、根据请求中待合并文件的数目创建一个StoreFile列表:readersToClose;
4.1.2、将请求中待合并文件逐一复制加入readersToClose列表;
4.1.3、根据readersToClose列表,即待合并文件的副本创建文件浏览器FileScanners;
4.2、如果不需要使用,即参数配置为false的话,使用请求中实际发送的文件列表;
4.2.1、创建空的列表readersToClose;
4.2.2、根据实际请求中的待合并文件列表创建文件浏览器FileScanners;
5、根据compact请求类型确定scan类型scanType:
如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES;
如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES。
6、如果有协处理器,调用协处理器的preCreateCoprocScanner()方法,获得scanner,如果协处理器中未创建scanner,调用createScanner()方法创建一个;
7、如果有协处理器,调用协处理器的preCompact()方法;
8、根据之前获取的smallestReadPoint和文件详情fd中的minSeqIdToKeep确定最小读取点smallestReadPoint,并置状态位cleanSeqId;
9、调用HStore的createWriterInTmp()方法,获取writer;
10、调用父类Compactor的performCompaction()方法,利用scanner、writer、smallestReadPoint、cleanSeqId执行合并:
实际上就是利用scanner读取旧文件数据,利用writer写入新文件数据。
11、如果没有完成合并:关闭writer、删除writer中的临时文件并抛出异常;
12、关闭scanner;
13、无异常的话,关闭writer;存在异常的话,写入元数据,关闭writer,并将写入地址加入newFiles;
14、依次关闭readersToClose中StoreFile的Reader;
15、返回newFiles。
大体流程就是如此。针对其中的某些细节,我们逐一进行分析。
首先说下这个文件详情FileDetails,它是通过getFileDetails()方法获取的。文件详情FileDetails类定义如下:
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */ protected static class FileDetails { /** Maximum key count after compaction (for blooms) */ // 合并之后总的keyvalue数目 public long maxKeyCount = 0; /** Earliest put timestamp if major compaction */ // 如果是major合并,最早的Put时间戳earliestPutTs public long earliestPutTs = HConstants.LATEST_TIMESTAMP; /** The last key in the files we're compacting. */ // 合并时文件中最大的序列号 public long maxSeqId = 0; /** Latest memstore read point found in any of the involved files */ // 相关文件中最新的MemStore数据读取点maxMVCCReadpoint public long maxMVCCReadpoint = 0; /** Max tags length**/ // 最大的tag长度maxTagsLength public int maxTagsLength = 0; /** Min SeqId to keep during a major compaction **/ // 在major合并期间需要保持的最小序列号minSeqIdToKeep public long minSeqIdToKeep = 0; }
而它的获取方法如下:
/** * Extracts some details about the files to compact that are commonly needed by compactors. * 提取文件合并的一些细节 * @param filesToCompact Files. * @param allFiles Whether all files are included for compaction * @return The result. */ protected FileDetails getFileDetails( Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException { // 构造一个FileDetails实例fd FileDetails fd = new FileDetails(); // 计算保持MVCC的最新HFile时间戳:当前时间-24小时 * keepSeqIdPeriod // keepSeqIdPeriod为一个参数,即被指定的在major合并期间MVCC值可以保持多少天 long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); // 遍历需要合并的文件 for (StoreFile file : filesToCompact) { // 如果allFiles为true,即所有文件都需要检测,且文件的修改时间小于上述保持MVCC的最新HFile时间戳 if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) { // when isAllFiles is true, all files are compacted so we can calculate the smallest // MVCC value to keep // 如果文件细节中需要保持的最小序列号小于文件MemStore的时间戳 if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) { // 将文件MemStore的时间戳赋值给fd的需要保持的最小序列号minSeqIdToKeep fd.minSeqIdToKeep = file.getMaxMemstoreTS(); } } // 获取文件的最大序列号ID long seqNum = file.getMaxSequenceId(); // 赋值给文件细节fd中的maxSeqId,记录待合并文件的最大序列号ID fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); // 获取Reader StoreFile.Reader r = file.getReader(); if (r == null) { LOG.warn("Null reader for " + file.getPath()); continue; } // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized // blooms can cause progress to be miscalculated or if the user switches bloom // type (e.g. from ROW to ROWCOL) // 获取文件中的keyvalue数量,实际上就是列的数量, // HBase底层对每个列都是按照keyvalue格式存储的,key包含rowkey+column family+quality+tm等,value即列值 long keyCount = r.getEntries(); // 累加keyvalue数目maxKeyCount fd.maxKeyCount += keyCount; // calculate the latest MVCC readpoint in any of the involved store files // 计算所有相关存储文件的最新mvcc读取点maxMVCCReadpoint // 先加载文件信息fileInfo Map<byte[], byte[]> fileInfo = r.loadFileInfo(); byte tmp[] = null; // Get and set the real MVCCReadpoint for bulk loaded files, which is the // SeqId number. // 如果是Bulk导入的,maxMVCCReadpoint为fd的maxMVCCReadpoint和文件SequenceID中较大者 if (r.isBulkLoaded()) { fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); } else { // 否则,读取文件信息中最大的memstore时间戳MAX_MEMSTORE_TS_KEY tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); if (tmp != null) { // maxMVCCReadpoint就是fd的maxMVCCReadpoint和文件信息中最大的memstore时间戳MAX_MEMSTORE_TS_KEY中较大者 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); } } // 更新最大标签长度maxTagsLength tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); if (tmp != null) { fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); } // If required, calculate the earliest put timestamp of all involved storefiles. // This is used to remove family delete marker during compaction. long earliestPutTs = 0; // 获取最早的Put时间戳earliestPutTs if (allFiles) { tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS); if (tmp == null) { // There's a file with no information, must be an old one // assume we have very old puts fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; } else { earliestPutTs = Bytes.toLong(tmp); fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); } } if (LOG.isDebugEnabled()) { LOG.debug("Compacting " + file + ", keycount=" + keyCount + ", bloomtype=" + r.getBloomFilterType().toString() + ", size=" + StringUtils.humanReadableInt(r.length()) + ", encoding=" + r.getHFileReader().getDataBlockEncoding() + ", seqNum=" + seqNum + (allFiles ? ", earliestPutTs=" + earliestPutTs: "")); } } // 返回合并细节fd return fd; }接下来再看下找到scanners中的最小的可读点,实际上就是找到最小能够读取数据的点,它是通过父类Compactor的getSmallestReadPoint()方法实现的,代码如下:
protected long getSmallestReadPoint() { // 获取的是HStore中的SmallestReadPoint return store.getSmallestReadPoint(); }可以看出,父类的该方法实际上还是通过HStore中的getSmallestReadPoint()方法实现的,如下:
@Override public long getSmallestReadPoint() { // 获取的是Region中的SmallestReadPoint,因为HBase是行级事务,SmallestReadPoint应该也是行级的 return this.region.getSmallestReadPoint(); }而HStore实际上最终获取的是Region中的SmallestReadPoint,这也从侧面反映了那个我们熟知的问题:因为HBase是行级事务,SmallestReadPoint应该也是行级的。而具体的SmallestReadPoint该如何获取,我们在以后的多版本控制协议MVCC中再细讲。
接下来,我们再看下如何创建文件浏览器FileScanners,它是通过父类Compactor的createFileScanners()方法来构造的,代码如下:
/** * Creates file scanners for compaction. * @param filesToCompact Files. * @return Scanners. */ protected List<StoreFileScanner> createFileScanners( final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException { return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, smallestReadPoint); }它是一个专门为合并创建scanner的方法,这个scanner区别于客户端的scanner,我们继续看StoreFileScanner的getScannersForStoreFiles()方法,如下:
/** * Return an array of scanners corresponding to the given set of store files, * And set the ScanQueryMatcher for each store file scanner for further * optimization */ public static List<StoreFileScanner> getScannersForStoreFiles( Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>( files.size()); // 遍历StoreFile文件files for (StoreFile file : files) { // 获取每个文件的Reader StoreFile.Reader r = file.createReader(); // 根据Reader获取StoreFileScanner类型的scanner,这个scanner专门用于读取StoreFile StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt); scanner.setScanQueryMatcher(matcher); // 加入scanner列表scanners scanners.add(scanner); } // 返回scanner列表 return scanners; }很简单,不再赘述,读者可以自己阅读源码。
继续,我们再看下如果获取一个内部InternalScanner类型的scanner,它是通过createScanner()来获取的,代码如下:
/** * 创建一个scanner * * @param store store * @param scanners Store file scanners. * @param scanType Scan type. * @param smallestReadPoint Smallest MVCC read point. * @param earliestPutTs Earliest put across all files. * @return A compaction scanner. */ protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { // 构造一个Scan实例scan Scan scan = new Scan(); // 设置最大版本号,即列簇被设置的最大版本号(是不是从这里就能看出,compact时会做数据清理工作呢,O(∩_∩)O) scan.setMaxVersions(store.getFamily().getMaxVersions()); // 返回一个StoreScanner实例 return new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, smallestReadPoint, earliestPutTs); }这里的scanner,实际上是StoreScanner类型的实例,它是针对Store的内部Scanner,而且,这里有一个重点,创建scan时会设置最大版本号,即列簇被设置的最大版本号,那么我们是不是从这里就能看出,compact时会做数据清理工作呢,答案当然是肯定的。所以HBase在数据修改时,并不是简单的删除,而是增加一个版本,而过期数据则会在compact过程中,通过scanner设置最大版本号的方式来过滤掉,这种处理方式是很高效的,它体现了HBase低延迟的特点。
有了读数据的scanner,我们接着来看下写数据的writer。毕竟数据得有读有写,才能将旧文件合并成新文件,而writer是通过HStore的createWriterInTmp()方法来创建的,如下:
/* * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction * @param includesMVCCReadPoint - whether to include MVCC or not * @param includesTag - includesTag or not * @return Writer for a new StoreFile in the tmp dir. */ @Override public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) throws IOException { final CacheConfig writerCacheConf; // 是否为合并 if (isCompaction) {// 如果是合并,不在writerCacheConf上缓存数据 // Don't cache data on write on compactions. writerCacheConf = new CacheConfig(cacheConf); writerCacheConf.setCacheDataOnWrite(false); } else { writerCacheConf = cacheConf; } InetSocketAddress[] favoredNodes = null; // 获取有利节点 if (region.getRegionServerServices() != null) { favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( region.getRegionInfo().getEncodedName()); } // 创建HFile上下文HFileContext HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, cryptoContext); // 创建StoreFile的StoreFile,需要使用上述信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点等 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem())// 文件系统 .withFilePath(fs.createTempName())// 文件路径 .withComparator(comparator)// 合并器 .withBloomType(family.getBloomFilterType()) .withMaxKeyCount(maxKeyCount)// 最大keyvalue数目 .withFavoredNodes(favoredNodes)// 有利节点 .withFileContext(hFileContext)// HFile上下文信息 .build(); return w; }这个writer本质上是StoreFile的Writer,它是针对存储文件的写入者,其中包含很多关键信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点、HFile上下文信息等。
有了scanner,可以读数据了,又有了writer,也可以写数据了,那么我们就可以开始合并了:由旧文件读取数据,往新文件写入数据。我们看下Compactor的performCompaction()方法,代码如下:
/** * Performs the compaction. * 执行合并 * * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException { // 已写字节数 long bytesWritten = 0; // 处于写过程的字节数 long bytesWrittenProgress = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. // Cell列表 List<Cell> cells = new ArrayList<Cell>(); // 周期性检测的阈值:合并已被处理的数据量大小,取参数hbase.hstore.close.check.interval,默认为10M long closeCheckInterval = HStore.getCloseCheckInterval(); long lastMillis = 0; if (LOG.isDebugEnabled()) { lastMillis = EnvironmentEdgeManager.currentTime(); } long now = 0; // 进入一个do...while循环,一直循环的条件是hasMore为true,即scanner中还有数据 boolean hasMore; do { // scanner中是否还存在数据,取出到cells中 hasMore = scanner.next(cells, compactionKVMax); if (LOG.isDebugEnabled()) { now = EnvironmentEdgeManager.currentTime(); } // output to writer: // 遍历cells,写入writer for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { CellUtil.setSequenceId(c, 0); } // 写入writer writer.append(c); // keyvalue大小 int len = KeyValueUtil.length(c); // 计数器累加:kv累计数目和累计大小 ++progress.currentCompactedKVs; progress.totalCompactedSize += len; if (LOG.isDebugEnabled()) { bytesWrittenProgress += len; } // check periodically to see if a system stop is requested // 周期性检测是否一个系统停止被请求 if (closeCheckInterval > 0) { // 累加已写字节数bytesWritten bytesWritten += len; // 如果已写字节数bytesWritten大于closeCheckInterval if (bytesWritten > closeCheckInterval) { // 重置已写字节数bytesWritten bytesWritten = 0; // 判断HStore是否可写,不可写的话,说明一个system stop请求已发起,则通过progress取消合并 if (!store.areWritesEnabled()) { progress.cancel(); return false; } } } } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { if ((now - lastMillis) >= 60 * 1000) { LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0))); lastMillis = now; bytesWrittenProgress = 0; } } // 情况cell列表 cells.clear(); } while (hasMore); // 合并过程progress标记已完成 progress.complete(); return true; }这个合并执行的过程还是比较简单的,它通过一个do...while循环,不断的从scanner中读取数据,放入cell列表,然后遍历cells,将Cell依次写入writer,并累加kv数目和大小,直到scanner中数据被处理完。如此,旧文件数据不断的被读取出来,然后将其不断的写入新文件,最好通过合并过程progress标记合并已完成。大致就是这个流程。
这里有个需要特别说明的地方,在数据合并过程中,还需要周期性的检测是否有外部发起系统关系的请求,如果是的话,则需要取消合并。这个周期性不是针对时间的,而是针对一个已合并数据量的阈值closeCheckInterval,这个closeCheckInterval取自参数hbase.hstore.close.check.interval,默认为10M。在合并过程中,被合并数据大小bytesWritten不断的被累加,直到超过阈值closeCheckInterval,清空,并且根据HStore的可写状态来判断是否有外部发起系统停止的请求,如果有的话,通过progress取消合并,否则继续进入下一个累加至阈值再进行判断的周期。
接下来,根据上述合并的结果finished,来判断后续处理步骤:如果没有完成合并:关闭writer、删除writer中的临时文件并抛出异常。
最好,如果存在异常e,写入元数据,关闭writer,并将写入地址加入newFiles;如果不存在异常e,则关闭writer,返回合并后的文件列表newFiles。不管结果如何,最终依次关闭readersToClose中StoreFile的Reader。
至此,整个HRegion中精确到HStore上的compact流程就分析完毕了。限于篇幅的原因,可能部分细节简单掠过或者没有提及,留待以后再慢慢分析吧!