数据如何从HBase读到MR

TableMapReduceUtil.initTableMapperJob是用来对内输入的,传递的参数之一,就是输入格式化类TableInputFormat.class,且会进行set操作:

job.setInputFormatClass(inputFormatClass);

TableInputFormat的父类TableInputFormatBase会创建TableRecordReader:

if (trr == null) {
  trr = new TableRecordReader();
}
Scan sc = new Scan(this.scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
trr.setHTable(table);
return trr;

所以记录读取器用的就是TableRecordReader。它会不断往下读:

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  return this.recordReaderImpl.nextKeyValue();
}

查看实现类,说明K-V就是rowkey-Result对象:

public boolean nextKeyValue() throws IOException, InterruptedException {
  if (key == null) key = new ImmutableBytesWritable();
  //这里说明value就是Result对象
  if (value == null) value = new Result();
  try {
    try {
      //读取下一行内容
      value = this.scanner.next();
      if (logScannerActivity) {
        rowcount ++;
        if (rowcount >= logPerRowCount) {
          long now = System.currentTimeMillis();
          LOG.info("Mapper took " + (now-timestamp)
            + "ms to process " + rowcount + " rows");
          timestamp = now;
          rowcount = 0;
        }
      }
    } catch (IOException e) {
      // do not retry if the exception tells us not to do so
      if (e instanceof DoNotRetryIOException) {
        throw e;
      }
      // try to handle all other IOExceptions by restarting
      // the scanner, if the second call fails, it will be rethrown
      LOG.info("recovered from " + StringUtils.stringifyException(e));
      if (lastSuccessfulRow == null) {
        LOG.warn("We are restarting the first next() invocation," +
            " if your mapper has restarted a few other times like this" +
            " then you should consider killing this job and investigate" +
            " why it's taking so long.");
      }
      if (lastSuccessfulRow == null) {
        restart(scan.getStartRow());
      } else {
        restart(lastSuccessfulRow);
        scanner.next();    // skip presumed already mapped row
      }
      value = scanner.next();
      numRestarts++;
    }
    if (value != null && value.size() > 0) {
      //这里getRow得到的就是RowKey
      key.set(value.getRow());
      lastSuccessfulRow = key.get();
      return true;
    }

    updateCounters();
    return false;
  } catch (IOException ioe) {
    if (logScannerActivity) {
      long now = System.currentTimeMillis();
      LOG.info("Mapper took " + (now-timestamp)
        + "ms to process " + rowcount + " rows");
      LOG.info(ioe);
      String lastRow = lastSuccessfulRow == null ?
        "null" : Bytes.toStringBinary(lastSuccessfulRow);
      LOG.info("lastSuccessfulRow=" + lastRow);
    }
    throw ioe;
  }
}

nextKeyValue()方法用来不断往下读,value就是Result对象。往下读取,调用的是this.scanner.next()。这里的K set的就是得到的Row Key。说明数据会以Row Key作为K,以Result对象作为V。

当我们在Mapper中进行如下操作,就能拿到单元格中的信息:

Cell cell = value.getColumnLatestCell("cf".getBytes(), "line".getBytes());
上一篇:利用单片机的蜂鸣器做个简易的音乐盒


下一篇:大数据:手写MR(-)