2021SC@SDUSC
一、前言
本文我们来简单介绍下HFile写入Cell的主体流程
二、浅析
HFile文件Cell写入的发起位置,一个就是Memstore flush时,StoreFlusher的preformFlush()方法:
/**
* Performs memstore flush, writing data from scanner into sink.
*
* @param scanner Scanner to get data from.
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
*/
protected void performFlush(InternalScanner scanner,
Compactor.CellSink sink, long smallestReadPoint) throws IOException {
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
List<Cell> kvs = new ArrayList<Cell>();
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (Cell c : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
sink.append(c);
}
kvs.clear();
}
} while (hasMore);
}
它会循环地将Cell写入Compactor.CellSink类型的sink。在performFlush()的调用者DefaultStoreFlusher的flushSnapshot()中,首先会调用HStore的createWriterInTmp()生成一个StoreFile.Writer的实例writer,然后将这个writer作为参数sink传入performFlush():
// Write the map out to the disk
writer = store.createWriterInTmp(
cellsCount, store.getFamily().getCompression(), false, true, true);
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null;
try {
performFlush(scanner, writer, smallestReadPoint);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
throw ioe;
} finally {
if (e != null) {
writer.close();
} else {
finalizeWriter(writer, cacheFlushId, status);
}
}
那么StoreFile.Writer类型的writer是如何生成的呢?进入到HStore的createWriterInTmp()方法中:
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
this.getFileSystem())
.withFilePath(fs.createTempName())
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.build();
在StoreFile.WriterBuilder的build()方法最后,会new一个StoreFile.Writer实例,在其构造方法中,会生成一个HFile.Writer实例writer:
writer = HFile.getWriterFactory(conf, cacheConf)
.withPath(fs, path)
.withComparator(comparator)
.withFavoredNodes(favoredNodes)
.withFileContext(fileContext)
.create();
而这个HFile.Writer实例writer则是通过HFile中WriterFactory获取的:
/**
* Returns the factory to be used to create {@link HFile} writers
*/
public static final WriterFactory getWriterFactory(Configuration conf,
CacheConfig cacheConf) {
int version = getFormatVersion(conf);
switch (version) {
case 2:
return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
case 3:
return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
default:
throw new IllegalArgumentException("Cannot create writer for HFile " +
"format version " + version);
}
}
HBase中包含两种类型的WriterFactoryV2:HFileWriterV2.WriterFactoryV2和HFileWriterV3 .WriterFactoryV3,我们看一下HFileWriterV2.WriterFactoryV2,它create的HFile.Writer实例其实为HFileWriterV2:
@Override
public Writer createWriter(FileSystem fs, Path path,
FSDataOutputStream ostream,
KVComparator comparator, HFileContext context) throws IOException {
context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
comparator, context);
}
}
所以最开始的sink调用append循环写入Cell,实际上最终调用的是HFileWriterV2的append()方法:
public void append(final Cell cell) throws IOException {
appendGeneralBloomfilter(cell);
appendDeleteFamilyBloomFilter(cell);
writer.append(cell);
trackTimestamps(cell);
}
这个writer的实例化就是我们上面讲到的HFileWriterV2的实例化。接下来,我们重点地看下HFileWriterV2的append()方法:
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param cell Cell to add. Cannot be empty nor null.
* @throws IOException
*/
@Override
public void append(final Cell cell) throws IOException {
byte[] value = cell.getValueArray();
int voffset = cell.getValueOffset();
int vlength = cell.getValueLength();
// checkKey uses comparator to check we are writing in order.
boolean dupKey = checkKey(cell);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting()) {
newBlock();
}
fsBlockWriter.write(cell);
totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
totalValueLength += vlength;
// Are we the first key in this block?
if (firstCellInBlock == null) {
// If cell is big, block will be closed and this firstCellInBlock reference will only last
// a short while.
firstCellInBlock = cell;
}
// TODO: What if cell is 10MB and we write infrequently? We'll hold on to the cell here
// indefinetly?
lastCell = cell;
entryCount++;
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
}
逻辑大体如下:
- 从Cell中获取value、voffset和vlength
- 调用checkKey()方法,检测给定的Cell,确保key有序,dupKey为true说明key未更改,反之则是已更改
- 调用checkValue(),检测value,确保其不为空
- 换key时才检测blokc边界,调用checkBlockBoundary()方法
- 若需申请新的数据块,调用newBlock()方法申请,判断依据是fsBlockWriter的isWriting()返回为false
- 调用fsBlockWriter的write()方法写入Cell
- 累加key长度totalKeyLength和Value长度totalValueLength
- 如果需要的话,标记该数据块中写入的第一个key
- 标记上次写入Cell
- entryCount+1
- 更新maxMemstoreTS
检测block边界的checkBlockBoundary()方法代码如下:
/**
* At a block boundary, write all the inline blocks and opens new block.
*
* @throws IOException
*/
protected void checkBlockBoundary() throws IOException {
if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
return;
finishBlock();
writeInlineBlocks(false);
newBlock();
}
它会判断fsBlockWriter已写入大小,如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,则直接返回,否则说明block已达到或超过阈值,需进行以下操作:
- 调用finishBlock()方法,结束上一个block
- 调用writeInlineBlocks()方法,写入InlineBlocks
- 调用newBlock()方法,申请新块
以上,如有错误,欢迎指正。