/** * Compact the StoreFiles. This method may take some time, so the calling * thread must be able to block for long periods. * * 合并存储文件。该方法可能花费一些时间, * * <p>During this time, the Store can work as usual, getting values from * StoreFiles and writing new StoreFiles from the memstore. * 在此期间,Store仍能像往常一样工作,从StoreFiles获取数据和从memstore写入新的StoreFiles * * Existing StoreFiles are not destroyed until the new compacted StoreFile is * completely written-out to disk. * * <p>The compactLock prevents multiple simultaneous compactions. * The structureLock prevents us from interfering with other write operations. * * <p>We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. * * <p> Compaction event should be idempotent, since there is no IO Fencing for * the region directory in hdfs. A region server might still try to complete the * compaction after it lost the region. That is why the following events are carefully * ordered for a compaction: * 1. Compaction writes new files under region/.tmp directory (compaction output) * 2. Compaction atomically moves the temporary file under region directory * 3. Compaction appends a WAL edit containing the compaction input and output files. * Forces sync on WAL. * 4. Compaction deletes the input files from the region directory. * * Failure conditions are handled like this: * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes * the compaction later, it will only write the new data file to the region directory. * Since we already have this data, this will be idempotent but we will have a redundant * copy of the data. * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. * - If RS fails after 3, the region region server who opens the region will pick up the * the compaction marker from the WAL and replay it by removing the compaction input files. * Failed RS can also attempt to delete those files, but the operation will be idempotent * * See HBASE-2231 for details. * * @param compaction compaction details obtained from requestCompaction() * @throws IOException * @return Storefile we compacted into or null if we failed or opted out early. */ @Override public List<StoreFile> compact(CompactionContext compaction) throws IOException { assert compaction != null; List<StoreFile> sfs = null; // 从合并上下文CompactionContext中获得合并请求CompactionRequest,即cr CompactionRequest cr = compaction.getRequest();; try { // Do all sanity checking in here if we have a valid CompactionRequest // because we need to clean up after it on the way out in a finally // block below // // 获取compact开始时间compactionStartTime long compactionStartTime = EnvironmentEdgeManager.currentTime(); // 确保合并请求request不为空,实际上getRequest已经判断并确保request不为空了,这里为什么还要再做判断和保证呢?先留个小小的疑问吧! assert compaction.hasSelection(); // 从合并请求cr中获得需要合并的文件集合filesToCompact,集合中存储的都是存储文件StoreFile的实例 // 这个文件集合是在构造CompactionRequest请求,或者合并其他请求时,根据传入的参数或者其他请求中附带的文件集合来确定的, // 即请求一旦生成,需要合并的文件集合filesToCompact就会存在 Collection<StoreFile> filesToCompact = cr.getFiles(); // 确保需要合并的文件集合filesToCompact不为空 assert !filesToCompact.isEmpty(); // 确保filesCompacting中包含所有的待合并文件filesToCompact synchronized (filesCompacting) { // sanity check: we're compacting files that this store knows about // TODO: change this to LOG.error() after more debugging Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); } // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + fs.getTempDir() + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); // Commence the compaction. // 开始合并,调用CompactionContext的compact()方法,获得合并后的新文件newFiles List<Path> newFiles = compaction.compact(); // TODO: get rid of this! // 根据参数hbase.hstore.compaction.complete确实是否要完整的完成compact // 这里有意思,这么处理意味着,新旧文件同时存在,新文件没有被挪到指定位置且新文件的Reader被关闭,对外提供服务的还是旧文件,啥目的呢?快速应用于读? if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { LOG.warn("hbase.hstore.compaction.complete is set to false"); // 创建StoreFile列表sfs,大小为newFiles的大小 sfs = new ArrayList<StoreFile>(newFiles.size()); // 遍历新产生的合并后的文件newFiles,针对每个文件创建StoreFile和Reader,关闭StoreFile上的Reader, // 并将创建的StoreFile添加至列表sfs for (Path newFile : newFiles) { // Create storefile around what we wrote with a reader on it. StoreFile sf = createStoreFileAndReader(newFile); // 关闭其上的Reader sf.closeReader(true); sfs.add(sf); } // 返回合并后的文件 return sfs; } // Do the steps necessary to complete the compaction. // 执行必要的步骤以完成这个合并 // 移动已完成文件至正确的地方,创建StoreFile和Reader,返回StoreFile列表sfs sfs = moveCompatedFilesIntoPlace(cr, newFiles); // 在WAL中写入Compaction记录 writeCompactionWalRecord(filesToCompact, sfs); // 替换StoreFiles: // 1、去除掉所有的合并前,即已被合并的文件compactedFiles,将合并后的文件sfs加入到StoreFileManager的storefiles中去, // storefiles为Store中目前全部提供服务的存储文件列表; // 2、正在合并的文件列表filesCompacting中去除被合并的文件filesToCompact; replaceStoreFiles(filesToCompact, sfs); // 根据合并的类型,针对不同的计数器做累加,方便系统性能指标监控 if (cr.isMajor()) {// 如果是Major合并 // 计数器累加,包括条数和大小 majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; } else {// 如果不是Major合并 // 计数器累加,包括条数和大小 compactedCellsCount += getCompactionProgress().totalCompactingKVs; compactedCellsSize += getCompactionProgress().totalCompactedSize; } // At this point the store will use new files for all new scanners. // 至此,store将会为所有新的scanners使用新的文件 // 完成合并:归档旧文件(在文件系统中删除已被合并的文件compactedFiles,实际上是归档操作,将旧的文件从原位置移到归档目录下),关闭其上的Reader,并更新store大小 completeCompaction(filesToCompact, true); // Archive old files & update store size. // 记录日志信息 logCompactionEndMessage(cr, sfs, compactionStartTime); // 返回StoreFile列表sfs return sfs; } finally { // 完成Compaction请求:Region汇报合并请求至终端、filesCompacting中删除请求中的所有待合并文件 finishCompactionRequest(cr); } }下面,我们来概述下整个流程:
private List<StoreFile> moveCompatedFilesIntoPlace( CompactionRequest cr, List<Path> newFiles) throws IOException { // 创建StoreFile列表sfs List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size()); // 遍历newFiles for (Path newFile : newFiles) { assert newFile != null; // 将新文件newFile挪至正确地点,并创建StoreFile和Reader StoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { this.getCoprocessorHost().postCompact(this, sf, cr); } assert sf != null; sfs.add(sf); } return sfs; }首先呢,创建StoreFile列表sfs,遍历合并后的文件newFiles,将新文件newFile挪至正确地点,并创建StoreFile和Reader。而文件位置改变,则是通过moveFileIntoPlace()方法实现的,它的代码如下:
// Package-visible for tests StoreFile moveFileIntoPlace(final Path newFile) throws IOException { // 检测新文件 validateStoreFile(newFile); // Move the file into the right spot // 移动文件至正确的地点 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); // 创建StoreFile和Reader return createStoreFileAndReader(destPath); }我们发现,移动文件实际上是通过HStore的成员变量fs的commitStoreFile()方法来完成的。这个fs是HRegionFileSystem类型的变量,HRegionFileSystem是HRegion上文件系统的一个抽象,它实现了各种文件等的实际物理操作。我们来看下它的commitStoreFile()方法:
/** * Move the file from a build/temp location to the main family store directory. * @param familyName Family that will gain the file * @param buildPath {@link Path} to the file to commit. * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number) * @param generateNewName False if you want to keep the buildPath name * @return The new {@link Path} of the committed file * @throws IOException */ private Path commitStoreFile(final String familyName, final Path buildPath, final long seqNum, final boolean generateNewName) throws IOException { // 根据列簇名familyName获取存储路径storeDir Path storeDir = getStoreDir(familyName); // 如果在文件系统fs中不存在路径的情况下创建它时失败则抛出异常 if(!fs.exists(storeDir) && !createDir(storeDir)) throw new IOException("Failed creating " + storeDir); String name = buildPath.getName(); if (generateNewName) { name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_"); } Path dstPath = new Path(storeDir, name); if (!fs.exists(buildPath)) { throw new FileNotFoundException(buildPath.toString()); } LOG.debug("Committing store file " + buildPath + " as " + dstPath); // buildPath exists, therefore not doing an exists() check. if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); } return dstPath; }非常简单,根据列簇名familyName获取存储路径storeDir,检测并在必要时创建storeDir,根据buildPath来获取文件名name,然后利用storeDir和name来构造目标路径storeDir,通过rename()方法实现文件从buildPath至dstPath的移动即可。
private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType()); storeFile.createReader(); return storeFile; }StoreFile是一个存储数据文件。Stores通常含有一个或多个StoreFile,而Reader是其内部类,由Reader来提供文件数据的读取服务。
/** * Writes the compaction WAL record. * 在WAL中写入合并记录 * * @param filesCompacted Files compacted (input). * @param newFiles Files from compaction. */ private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted, Collection<StoreFile> newFiles) throws IOException { // 如果region中的WAL为空,则直接返回 if (region.getWAL() == null) return; // 将被合并的文件路径添加至inputPaths列表 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size()); for (StoreFile f : filesCompacted) { inputPaths.add(f.getPath()); } // 将合并后的文件路径添加至inputPaths列表 List<Path> outputPaths = new ArrayList<Path>(newFiles.size()); for (StoreFile f : newFiles) { outputPaths.add(f.getPath()); } // 获取HRegionInfo,即info HRegionInfo info = this.region.getRegionInfo(); // 构造compaction的描述信息CompactionDescriptor CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); // 利用WALUtil工具类的writeCompactionMarker()方法,在WAL中写入一个合并标记 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(), this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId()); }逻辑比较简单:
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) { // compaction descriptor contains relative paths. // input / output paths are relative to the store dir // store dir is relative to region dir CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder() .setTableName(ByteStringer.wrap(info.getTableName())) .setEncodedRegionName(ByteStringer.wrap(info.getEncodedNameAsBytes())) .setFamilyName(ByteStringer.wrap(family)) .setStoreHomeDir(storeDir.getName()); //make relative for (Path inputPath : inputPaths) { builder.addCompactionInput(inputPath.getName()); //relative path } for (Path outputPath : outputPaths) { builder.addCompactionOutput(outputPath.getName()); } builder.setRegionName(ByteStringer.wrap(info.getRegionName())); return builder.build(); }最后,利用WALUtil工具类的writeCompactionMarker()方法,在WAL中写入一个合并标记,我们来看下代码:
/** * Write the marker that a compaction has succeeded and is about to be committed. * This provides info to the HMaster to allow it to recover the compaction if * this regionserver dies in the middle (This part is not yet implemented). It also prevents * the compaction from finishing if this regionserver has already lost its lease on the log. * @param sequenceId Used by WAL to get sequence Id for the waledit. */ public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { // 从合并信息CompactionDescriptor中获取表名tn TableName tn = TableName.valueOf(c.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. // 根据region的名字、表明tn,创建一个WALKey WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); // WAL中添加一条记录,包括表的描述信息HTableDescriptor、WALKey、Compaction信息WALEdit、序列号sequenceId // Compaction信息WALEdit是根据WALEdit的createCompaction()方法,由HRegionInfo、CompactionDescriptor获取的 // log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null); // 同步日志 log.sync(); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } }它实际上在WAL中append了一条记录,包括表的描述信息HTableDescriptor、WALKey、Compaction信息WALEdit、序列号sequenceId,而Compaction信息WALEdit是根据WALEdit的createCompaction()方法,由HRegionInfo、CompactionDescriptor构造的。代码如下:
/** * Create a compacion WALEdit * @param c * @return A WALEdit that has <code>c</code> serialized as its value */ public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) { // 将CompactionDescriptor转化成byte [] byte [] pbbytes = c.toByteArray(); // 构造KeyValue,包括Region的startKey、“METAFAMILY”字符串、 // "HBASE::COMPACTION"字符串、当前时间和合并描述CompactionDescriptor的二进制形式 KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, EnvironmentEdgeManager.currentTime(), pbbytes); // 将KeyValue添加至WALEdit,并返回WALEdit实例 return new WALEdit().add(kv); //replication scope null so that this won't be replicated }代码注释比较详细,不再赘述。
@VisibleForTesting void replaceStoreFiles(final Collection<StoreFile> compactedFiles, final Collection<StoreFile> result) throws IOException { // 加锁,上读写锁ReentrantReadWriteLock的写锁,意味着这是一把互斥锁 this.lock.writeLock().lock(); try { // 通过StoreFileManager的addCompactionResults()方法,将被合并的文件 // 去除掉所有的合并前,即已被合并的文件compactedFiles // 将合并后的文件加入到StoreFileManager的storefiles中去,storefiles为Store中目前全部提供服务的存储文件列表 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); // 正在合并的文件列表filesCompacting中去除被合并的文件 filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock(); } finally { // 解锁 this.lock.writeLock().unlock(); } }4、完成合并:归档旧文件(在文件系统中删除已被合并的文件compactedFiles,实际上是归档操作,将旧的文件从原位置移到归档目录下),关闭其上的Reader,并更新store大小。completeCompaction()代码如下:
/** * <p>It works by processing a compaction that's been written to disk. * * <p>It is usually invoked at the end of a compaction, but might also be * invoked at HStore startup, if the prior execution died midway through. * * <p>Moving the compacted TreeMap into place means: * <pre> * 1) Unload all replaced StoreFile, close and collect list to delete. * 2) Compute new store size * </pre> * * @param compactedFiles list of files that were compacted */ @VisibleForTesting protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles) throws IOException { try { // Do not delete old store files until we have sent out notification of // change in case old files are still being accessed by outstanding scanners. // Don't do this under writeLock; see HBASE-4485 for a possible deadlock // scenario that could have happened if continue to hold the lock. // 通知Reader观察者 notifyChangedReadersObservers(); // At this point the store will use new files for all scanners. // let the archive util decide if we should archive or delete the files LOG.debug("Removing store files after compaction..."); // 遍历已被合并的文件completeCompaction,关闭其上的Reader for (StoreFile compactedFile : compactedFiles) { compactedFile.closeReader(true); } // 在文件系统中删除已被合并的文件compactedFiles,实际上是归档操作,将旧的文件从原位置移到归档目录下 if (removeFiles) { this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); } } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.error("Failed removing compacted files in " + this + ". Files we were trying to remove are " + compactedFiles.toString() + "; some of them may have been already removed", e); } // 4. Compute new store size // 计算新的store大小 this.storeSize = 0L; this.totalUncompressedBytes = 0L; // 遍历StoreFiles,计算storeSize、totalUncompressedBytes等大小 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { StoreFile.Reader r = hsf.getReader(); if (r == null) { LOG.warn("StoreFile " + hsf + " has a null Reader"); continue; } this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } }
/** * Closes and archives the specified store files from the specified family. * @param familyName Family that contains the store files * @param storeFiles set of store files to remove * @throws IOException if the archiving fails */ public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles) throws IOException { HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs, this.tableDir, Bytes.toBytes(familyName), storeFiles); }它最终是通过HFileArchiver的archiveStoreFiles()方法来完成的,代码如下:
/** * Remove the store files, either by archiving them or outright deletion * @param conf {@link Configuration} to examine to determine the archive directory * @param fs the filesystem where the store files live * @param regionInfo {@link HRegionInfo} of the region hosting the store files * @param family the family hosting the store files * @param compactedFiles files to be disposed of. No further reading of these files should be * attempted; otherwise likely to cause an {@link IOException} * @throws IOException if the files could not be correctly disposed. */ public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo, Path tableDir, byte[] family, Collection<StoreFile> compactedFiles) throws IOException { // sometimes in testing, we don't have rss, so we need to check for that if (fs == null) { LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:" + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)); deleteStoreFilesWithoutArchiving(compactedFiles); return; } // short circuit if we don't have any files to delete // 判断被合并文件列表compactedFiles的大小,如果为0,立即返回 if (compactedFiles.size() == 0) { LOG.debug("No store files to dispose, done!"); return; } // build the archive path if (regionInfo == null || family == null) throw new IOException( "Need to have a region and a family to archive from."); // 获取归档存储路径 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); // make sure we don't archive if we can't and that the archive dir exists // 创建路径 if (!fs.mkdirs(storeArchiveDir)) { throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:" + Bytes.toString(family) + ", deleting compacted files instead."); } // otherwise we attempt to archive the store files if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files."); // Wrap the storefile into a File StoreToFile getStorePath = new StoreToFile(fs); Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath); // do the actual archive // 通过resolveAndArchive()执行归档 if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) { throw new IOException("Failed to archive/delete all the files for region:" + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family) + " into " + storeArchiveDir + ". Something is probably awry on the filesystem."); } }层层调用啊,接着来吧,继续看关键代码:
// 如果是文件 if (file.isFile()) { // attempt to archive the file if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) { LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir); failures.add(file); } }而这个resolveAndArchiveFile()方法不是简单的删除文件,而是通过rename()方法将旧的存储文件挪至了归档路径下,代码如下:
// move the archive file to the stamped backup Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime); if (!fs.rename(archiveFile, backedupArchiveFile)) { LOG.error("Could not rename archive file to backup: " + backedupArchiveFile + ", deleting existing file in favor of newer."); // try to delete the exisiting file, if we can't rename it if (!fs.delete(archiveFile, false)) { throw new IOException("Couldn't delete existing archive file (" + archiveFile + ") or rename it to the backup file (" + backedupArchiveFile + ") to make room for similarly named file."); } }5、完成Compaction请求:Region汇报合并请求至终端、filesCompacting中删除请求中的所有待合并文件
private void finishCompactionRequest(CompactionRequest cr) { // Region汇报合并请求至终端 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); // if (cr.isOffPeak()) { offPeakCompactionTracker.set(false); cr.setOffPeak(false); } // filesCompacting中删除请求中的所有待合并文件 synchronized (filesCompacting) { filesCompacting.removeAll(cr.getFiles()); } }读者可自行分析,不再赘述。