优点:
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
限制:
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群
Bulk Loading 方法的主要原理是利用HBase里面的数据最终会以HFile的形式存储在hdfs上的特点,绕过hbase原先利用put写入数据的过程,直接将数据以HFile的形式写入hdfs中,再让HBase加载HFile文件,从而在hbase中完成巨量数据的一次性入库。而具体实现Bulk Loading的类就是HBase提供的HFileOutputFormat类以及LoadIncrementalHFiles类
主要步骤分为两步,第一步是在hdfs中生成HFile文件(借助HFileOutputFormat类),第二步是将生成的HFile文件加载到HBase的Region里面,即HFile文件入库到HBase中( 借助LoadIncrementalHFiles类)
需要说明的是,一开始生成HFile文件是利用MapReduce任务生成,存储在hdfs中。而HBase的HFile也是存储在hdfs里面。但是不能直接将MapReduce生成的HFile文件移动到HBase的目录下面,因为这样不会更新HBase的元数据,所以需要利用 LoadIncrementalHFiles的doBulkLoad方法完成HFile文件的入库HBase
查看一下HFileOutputFormat类的使用说明
-
Sets the number of reduce tasks to match the current number of regions
Bulk Loading底层调用的是MapReduce任务,因此这句话的含义是设置的reduce任务的数量应该和要写入的表的region数量相同 -
Sets the output key/value class to match HFileOutputFormat2’s requirements
设置合适的输出的key和value的类型 -
Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)
设置合理的排序,对于reduce来说,reduce之间应该是有序的,reduce内的数据也应该是有序的
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);保证reduce之间有序,而且数据不会重叠
job.setReducerClass(KeyValueSortReducer.class);保证reduce内部有序
类的介绍中给出了两个排序class,分别是KeyValueSortReducer or PutSortReducer,根据自己的代码选择具体用哪个 -
The user should be sure to set the mapy output value class to either KeyValue or Put before runningthis function.
Map输出的value数据类型应该是 KeyValue 类型 或者 Put 类型
设置合适的输出的key和value的类型,因为Bulk Loading是从hdfs中读取数据,所以Mapper的输入key和value的类型是,LongWritable和Text。而Mapper的输出key和value则与以前不一样。输出的key应该是rowkey类型,也即ImmutableBytesWritable类型,而输出的value可以是KeyValue类型或者Put类型。
输出的key是rowkey类型,是为了方便排序与分区,因为MapTask输出的数据会经过分区排序的过程后再发送给reduce端,为了保证reduce端处理的数据不能重复,而且有序,因此使用rowkey作为Mapper的输出key
假设要写入的表总共有3个region,那么就应该有3个reduce,而这3个reduce处理的数据应该是rowkey0-10,rowkey11-20,rowkey21-30这种,保证每个reduce处理的数据都不会相同。
而对于输出的value,KeyValue类型的含义是一个单元格cell的值,Put类型的含义是一条数据的值。根据需要选择不同的输出类型。
具体实现:
package Demo.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class bulkLoad {
public static class BulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String mdn = split[0];
String start_time = split[1];
//经度
String longitude = split[4];
//纬度
String latitude = split[5];
//设置要写入表中的rowkey值
String rowkey = mdn+"_"+start_time;
//设置要写入表中的经纬度的值,由于KeyValue传入的是单元格cell的值
//所以通过rowkey+列簇+列名的形式确定一个单元格,然后传入具体的单元格里面的值
KeyValue lgKV = new KeyValue(rowkey.getBytes(),"info".getBytes(),"lg".getBytes(),longitude.getBytes());
KeyValue latKV = new KeyValue(rowkey.getBytes(),"info".getBytes(),"lat".getBytes(),latitude.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()),latKV);
context.write(new ImmutableBytesWritable(rowkey.getBytes()),latKV);
}
}
//因为这里没有计算的需求,所以Reducer的代码可以省略不写
//Driver端
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
Job job = Job.getInstance(conf);
job.setJobName("bulkLoad");
job.setJarByClass(bulkLoad.class);
//此时不能再在Driver端设置reduce的数量,因为reduce的数量与表里面的region数量一致
//即使在这里写了设置语句,也不会生效
//配置Map
job.setMapperClass(BulkLoadMapper.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
//保证不同的reduce处理的数据不会有重叠,并且reduce之间是有序的
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
//配置reduce
//保证在Reduce内部的数据之间是有序的
job.setReducerClass(KeyValueSortReducer.class);
//配置输入输出路径
FileInputFormat.addInputPath(job,new Path("/bulk_load/input"));
Path outputPath = new Path("/bulk_load/output");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job,outputPath);
//配置Hfile文件的生成与加载入库Hbase
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
//获取RegionLocator对象
//因为不同的HFile文件属于不同的region,RegionLocator对象就是用来告诉HFile文件应该去哪个Region的
RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));
//第一步,生成HFile
// 使用HFileOutputFormat2将输出的数据格式化为HFile文件
//这个方法需要传入三个参数,分别是job,表名,RegionLocator对象
HFileOutputFormat2.configureIncrementalLoad(
job,
dianxin_bulk,
regionLocator
);
//等待第一步的HFile文件写入完成,因为调用MapReduce任务是为了生成HFile文件
//所以第二步加载数据,应该在job.waitForCompletion之后,即任务完成后再加载HFile文件入库
boolean flag = job.waitForCompletion(true);
if(flag){
//第二步,加载HFile到HBase中
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
//需要传入方法的参数依次是输出路径,Admin对象,表名,RegionLocator对象
load.doBulkLoad(
outputPath,
admin,
dianxin_bulk,
regionLocator
);
}else{
System.out.println("MapReduce任务运行失败");
}
}
}
执行程序之前,先到HBase中创建一张预分区的表
hbase(main):001:0> create 'dianxin_bulk','info',{SPLITS=>['1|','3|','5|','7|','9|','B|','D|']}
0 row(s) in 5.4560 seconds
=> Hbase::Table - dianxin_bulk
然后在hdfs中创建input文件,然后将数据集传入input文件夹中
hdfs dfs -mkdir -p /bulk_load/input
hdfs dfs -put /usr/local/data/DIANXIN.csv /bulk_load/input
然后将项目文件打包传到linux本地
运行jar包里面的指定程序
hadoop jar hdfs-1.0-SNAPSHOT-jar-with-dependencies.jar Demo.hbase.bulkLoad
程序运行结束后,到可视化页面查看一下
补充:
MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。