2021SC@SDUSC Hbase(十四)项目代码分析-HFile写入Cell

2021SC@SDUSC

一、前言

        本文我们来简单介绍下HFile写入Cell的主体流程

二、浅析

        HFile文件Cell写入的发起位置,一个就是Memstore flush时,StoreFlusher的preformFlush()方法:

  /**
   * Performs memstore flush, writing data from scanner into sink.
   * 
   * @param scanner Scanner to get data from.
   * @param sink Sink to write data to. Could be StoreFile.Writer.
   * @param smallestReadPoint Smallest read point used for the flush.
   */
  protected void performFlush(InternalScanner scanner,
      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
    int compactionKVMax =
      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
    List<Cell> kvs = new ArrayList<Cell>();
    boolean hasMore;
    do {
      hasMore = scanner.next(kvs, compactionKVMax);
      if (!kvs.isEmpty()) {
        for (Cell c : kvs) {
          // If we know that this KV is going to be included always, then let us
          // set its memstoreTS to 0. This will help us save space when writing to
          // disk.

          sink.append(c);
        }
        kvs.clear();
      }
    } while (hasMore);
  }

        它会循环地将Cell写入Compactor.CellSink类型的sink。在performFlush()的调用者DefaultStoreFlusher的flushSnapshot()中,首先会调用HStore的createWriterInTmp()生成一个StoreFile.Writer的实例writer,然后将这个writer作为参数sink传入performFlush():

        // Write the map out to the disk
        writer = store.createWriterInTmp(
            cellsCount, store.getFamily().getCompression(), false, true, true);
        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }

那么StoreFile.Writer类型的writer是如何生成的呢?进入到HStore的createWriterInTmp()方法中:


    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
        this.getFileSystem())
            .withFilePath(fs.createTempName())
            .withComparator(comparator)
            .withBloomType(family.getBloomFilterType())
            .withMaxKeyCount(maxKeyCount)
            .withFavoredNodes(favoredNodes)
            .withFileContext(hFileContext)
            .build();

在StoreFile.WriterBuilder的build()方法最后,会new一个StoreFile.Writer实例,在其构造方法中,会生成一个HFile.Writer实例writer:

      writer = HFile.getWriterFactory(conf, cacheConf)
          .withPath(fs, path)
          .withComparator(comparator)
          .withFavoredNodes(favoredNodes)
          .withFileContext(fileContext)
          .create();

而这个HFile.Writer实例writer则是通过HFile中WriterFactory获取的:

  /**
   * Returns the factory to be used to create {@link HFile} writers
   */
  public static final WriterFactory getWriterFactory(Configuration conf,
      CacheConfig cacheConf) {
    int version = getFormatVersion(conf);
    switch (version) {
    case 2:
      return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
    case 3:
      return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
    default:
      throw new IllegalArgumentException("Cannot create writer for HFile " +
          "format version " + version);
    }
  }

HBase中包含两种类型的WriterFactoryV2:HFileWriterV2.WriterFactoryV2和HFileWriterV3 .WriterFactoryV3,我们看一下HFileWriterV2.WriterFactoryV2,它create的HFile.Writer实例其实为HFileWriterV2:

    @Override
    public Writer createWriter(FileSystem fs, Path path, 
        FSDataOutputStream ostream,
        KVComparator comparator, HFileContext context) throws IOException {
      context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
      return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
          comparator, context);
      }
    }

所以最开始的sink调用append循环写入Cell,实际上最终调用的是HFileWriterV2的append()方法:

    public void append(final Cell cell) throws IOException {
      appendGeneralBloomfilter(cell);
      appendDeleteFamilyBloomFilter(cell);
      writer.append(cell);
      trackTimestamps(cell);
    }

这个writer的实例化就是我们上面讲到的HFileWriterV2的实例化。接下来,我们重点地看下HFileWriterV2的append()方法:

  /**
   * Add key/value to file. Keys must be added in an order that agrees with the
   * Comparator passed on construction.
   *
   * @param cell Cell to add. Cannot be empty nor null.
   * @throws IOException
   */
  @Override
  public void append(final Cell cell) throws IOException {
    
	byte[] value = cell.getValueArray();
    int voffset = cell.getValueOffset();
    int vlength = cell.getValueLength();
    
    // checkKey uses comparator to check we are writing in order.
    boolean dupKey = checkKey(cell);
    checkValue(value, voffset, vlength);
    if (!dupKey) {
      checkBlockBoundary();
    }
    if (!fsBlockWriter.isWriting()) {
      newBlock();
    }
    fsBlockWriter.write(cell);
    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
    totalValueLength += vlength;
    // Are we the first key in this block?
    if (firstCellInBlock == null) {
      // If cell is big, block will be closed and this firstCellInBlock reference will only last
      // a short while.
      firstCellInBlock = cell;
    }
 
    // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
    // indefinetly?
    lastCell = cell;
    
    entryCount++;
    
    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
  }

逻辑大体如下:

  1. 从Cell中获取value、voffset和vlength
  2. 调用checkKey()方法,检测给定的Cell,确保key有序,dupKey为true说明key未更改,反之则是已更改
  3. 调用checkValue(),检测value,确保其不为空
  4. 换key时才检测blokc边界,调用checkBlockBoundary()方法
  5. 若需申请新的数据块,调用newBlock()方法申请,判断依据是fsBlockWriter的isWriting()返回为false
  6. 调用fsBlockWriter的write()方法写入Cell
  7. 累加key长度totalKeyLength和Value长度totalValueLength
  8. 如果需要的话,标记该数据块中写入的第一个key
  9. 标记上次写入Cell
  10. entryCount+1
  11. 更新maxMemstoreTS

检测block边界的checkBlockBoundary()方法代码如下:

  /**
   * At a block boundary, write all the inline blocks and opens new block.
   *
   * @throws IOException
   */
  protected void checkBlockBoundary() throws IOException {
	if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
      return;
    finishBlock();
    writeInlineBlocks(false);
    newBlock();
  }

它会判断fsBlockWriter已写入大小,如果fsBlockWriter已写入大小小于hFileContext中定义的块大小,则直接返回,否则说明block已达到或超过阈值,需进行以下操作:

  1. 调用finishBlock()方法,结束上一个block
  2. 调用writeInlineBlocks()方法,写入InlineBlocks
  3. 调用newBlock()方法,申请新块

以上,如有错误,欢迎指正。

上一篇:springboot整合jsr303校验规则


下一篇:HttpServletResponse与HttpServletRequest