用户画像的场景中,通常会开发很多标签,每个标签作为一个qualifier,其中有一些不再使用后需要下线,但hbase提供的delete相关api都只能针对单行,要清理某个qualifier的全部数据不太容易,这里提供一个基于协处理器的实现方案;
hbase对于compact过程提供了以下5个hook可以嵌入自定义代码:
- preCompactSelection
- postCompactSelection
- preCompactScannerOpen
- preCompact
- postCompact
而preCompact会在创建了storeScanner之后读取数据之前调用,因此这里的思路就是对scanner进行代理,创建一个新的scanner实现其next方法,进而对读取到的原始数据进行加工;
代码如下,参考了hbase-examples模块中的ValueRewritingObserver类:
public class QualifierDeletingObserver implements RegionObserver, RegionCoprocessor {
private static final Logger LOG = LoggerFactory.getLogger(QualifierDeletingObserver.class);
private byte[] qualifierToDelete = null;
private Bytes.ByteArrayComparator comparator;
@Override
public Optional<RegionObserver> getRegionObserver() {
// Extremely important to be sure that the coprocessor is invoked as a RegionObserver
return Optional.of(this);
}
@Override
public void start(
@SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
qualifierToDelete = Bytes.toBytes(renv.getConfiguration().get("qualifier.to.delete"));
comparator = new Bytes.ByteArrayComparator();
}
@Override
public InternalScanner preCompact(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
final InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
InternalScanner modifyingScanner = new InternalScanner() {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean ret = scanner.next(result, scannerContext);
for (int i = 0; i < result.size(); i++) {
Cell c = result.get(i);
byte[] qualifier = CellUtil.cloneQualifier(c);
if (comparator.compare(qualifier, qualifierToDelete) == 0) {
result.remove(i);
}
}
return ret;
}
@Override
public void close() throws IOException {
scanner.close();
}
};
return modifyingScanner;
}
}
打成jar包上传到hdfs;
以下是简单的测试过程展示;
create 'cp_test','f'
put 'cp_test','rk1','f:q1','123'
put 'cp_test','rk1','f:q2','123'
put 'cp_test','rk2','f:q1','123'
put 'cp_test','rk2','f:q2','123'
put 'cp_test','rk2','f:q3','123'
hbase(main):015:0> scan 'cp_test'
ROW COLUMN+CELL
rk1 column=f:q1, timestamp=1590567958995, value=123
rk1 column=f:q2, timestamp=1590567959023, value=123
rk2 column=f:q1, timestamp=1590567959048, value=123
rk2 column=f:q2, timestamp=1590567959073, value=123
rk2 column=f:q3, timestamp=1590567959842, value=123
alter 'cp_test' \
, METHOD => 'table_att', 'coprocessor'=>'hdfs://xxx.jar|xxx.QualifierDelexxxtingObserver|1024|qualifier.to.delete=q1'
flush 'cp_test'
major_compact 'cp_test'
hbase(main):017:0> scan 'cp_test'
ROW COLUMN+CELL
rk1 column=f:q2, timestamp=1590567959023, value=123
rk2 column=f:q2, timestamp=1590567959073, value=123
rk2 column=f:q3, timestamp=1590567959842, value=123