使用BulkLoading可以将数据大量导入Hbase表中,比之前的使用Java流快
优点:
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
限制:
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群
生成HFile部分
reduce的数量由表的region数量决定
```bash
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Demo01 {
public static class BulkLoadingMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String string = value.toString();
String[] split = string.split(",");
String row_key_id = split[0];
String name = split[1];
String age = split[2];
String gender = split[3];
String clazz = split[4];
KeyValue value1 = new KeyValue(row_key_id.getBytes(), "info".getBytes(), "name".getBytes(), name.getBytes());
KeyValue value2 = new KeyValue(row_key_id.getBytes(), "info".getBytes(), "age".getBytes(), age.getBytes());
KeyValue value3 = new KeyValue(row_key_id.getBytes(), "info".getBytes(), "gender".getBytes(), gender.getBytes());
KeyValue value4 = new KeyValue(row_key_id.getBytes(), "info".getBytes(), "clazz".getBytes(), clazz.getBytes());
context.write(new ImmutableBytesWritable(row_key_id.getBytes()),value1);
context.write(new ImmutableBytesWritable(row_key_id.getBytes()),value2);
context.write(new ImmutableBytesWritable(row_key_id.getBytes()),value3);
context.write(new ImmutableBytesWritable(row_key_id.getBytes()),value4);
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
//创建Job任务
Job job = Job.getInstance(configuration);
//设置Jar
job.setJarByClass(Demo01.class);
job.setJobName("bulkloading导入数据...");
//保证每个reduce之间的数据使有序的
//eg:第一个reduce:1-10,第二个为10-20....
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
//设置reduce个数
//如果创建的表分区了,也可以不设置,不同的region在不同reduce处理
//reduce的数量由表的region数量决定
job.setNumReduceTasks(4);
//配置map任务
job.setMapperClass(BulkLoadingMapper.class);
//配置reduce任务
//keyValueSortReduce保证在每个reduce有序
job.setReducerClass(KeyValueSortReducer.class);
//输入输出路径
FileInputFormat.addInputPath(job,new Path("/data/students.txt"));
FileOutputFormat.setOutputPath(job,new Path("/data/hfile"));
//创建Hbase连接
Connection connection =
ConnectionFactory.createConnection(configuration);
//创建students表
// Admin admin = connection.getAdmin();
// HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf("students"));
// HColumnDescriptor info = new HColumnDescriptor("info");
// tableDescriptor.addFamily(info);
// admin.createTable(tableDescriptor);
//获取students表
Table table = connection.getTable(TableName.valueOf("students"));
//获取bulkLoading表region定位器
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("students"));
//使用hFileoutputFormat2将输出的数据按照hFile的形式格式化
HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);
//等到mapreduce任务执行完成
job.waitForCompletion(true);
//加载HFileshu'j到habse的table表中
LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
load.doBulkLoad(new Path("/data/hfile"),connection.getAdmin(),table,regionLocator);
/**
* 命令
* create 'students','info'
* hadoop jar HBaseJavaAPI10-1.0-jar-with-dependencies.jar com.shujiad.demo05bulkloading.Demo01
*/
}
}