TableMapReduceUtil.initTableMapperJob是用来对内输入的,传递的参数之一,就是输入格式化类TableInputFormat.class,且会进行set操作:
job.setInputFormatClass(inputFormatClass);
TableInputFormat的父类TableInputFormatBase会创建TableRecordReader:
if (trr == null) {
trr = new TableRecordReader();
}
Scan sc = new Scan(this.scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
trr.setHTable(table);
return trr;
所以记录读取器用的就是TableRecordReader。它会不断往下读:
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return this.recordReaderImpl.nextKeyValue();
}
查看实现类,说明K-V就是rowkey-Result对象:
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) key = new ImmutableBytesWritable();
//这里说明value就是Result对象
if (value == null) value = new Result();
try {
try {
//读取下一行内容
value = this.scanner.next();
if (logScannerActivity) {
rowcount ++;
if (rowcount >= logPerRowCount) {
long now = System.currentTimeMillis();
LOG.info("Mapper took " + (now-timestamp)
+ "ms to process " + rowcount + " rows");
timestamp = now;
rowcount = 0;
}
}
} catch (IOException e) {
// do not retry if the exception tells us not to do so
if (e instanceof DoNotRetryIOException) {
throw e;
}
// try to handle all other IOExceptions by restarting
// the scanner, if the second call fails, it will be rethrown
LOG.info("recovered from " + StringUtils.stringifyException(e));
if (lastSuccessfulRow == null) {
LOG.warn("We are restarting the first next() invocation," +
" if your mapper has restarted a few other times like this" +
" then you should consider killing this job and investigate" +
" why it's taking so long.");
}
if (lastSuccessfulRow == null) {
restart(scan.getStartRow());
} else {
restart(lastSuccessfulRow);
scanner.next(); // skip presumed already mapped row
}
value = scanner.next();
numRestarts++;
}
if (value != null && value.size() > 0) {
//这里getRow得到的就是RowKey
key.set(value.getRow());
lastSuccessfulRow = key.get();
return true;
}
updateCounters();
return false;
} catch (IOException ioe) {
if (logScannerActivity) {
long now = System.currentTimeMillis();
LOG.info("Mapper took " + (now-timestamp)
+ "ms to process " + rowcount + " rows");
LOG.info(ioe);
String lastRow = lastSuccessfulRow == null ?
"null" : Bytes.toStringBinary(lastSuccessfulRow);
LOG.info("lastSuccessfulRow=" + lastRow);
}
throw ioe;
}
}
nextKeyValue()方法用来不断往下读,value就是Result对象。往下读取,调用的是this.scanner.next()。这里的K set的就是得到的Row Key。说明数据会以Row Key作为K,以Result对象作为V。
当我们在Mapper中进行如下操作,就能拿到单元格中的信息:
Cell cell = value.getColumnLatestCell("cf".getBytes(), "line".getBytes());