HBase快速导入巨量数据—— Bulk Loading

优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

Bulk Loading 方法的主要原理是利用HBase里面的数据最终会以HFile的形式存储在hdfs上的特点,绕过hbase原先利用put写入数据的过程,直接将数据以HFile的形式写入hdfs中,再让HBase加载HFile文件,从而在hbase中完成巨量数据的一次性入库。而具体实现Bulk Loading的类就是HBase提供的HFileOutputFormat类以及LoadIncrementalHFiles类

主要步骤分为两步,第一步是在hdfs中生成HFile文件(借助HFileOutputFormat类),第二步是将生成的HFile文件加载到HBase的Region里面,即HFile文件入库到HBase中( 借助LoadIncrementalHFiles类)

需要说明的是,一开始生成HFile文件是利用MapReduce任务生成,存储在hdfs中。而HBase的HFile也是存储在hdfs里面。但是不能直接将MapReduce生成的HFile文件移动到HBase的目录下面,因为这样不会更新HBase的元数据,所以需要利用 LoadIncrementalHFiles的doBulkLoad方法完成HFile文件的入库HBase

查看一下HFileOutputFormat类的使用说明
HBase快速导入巨量数据—— Bulk Loading

  1. Sets the number of reduce tasks to match the current number of regions
    Bulk Loading底层调用的是MapReduce任务,因此这句话的含义是设置的reduce任务的数量应该和要写入的表的region数量相同

  2. Sets the output key/value class to match HFileOutputFormat2’s requirements
    设置合适的输出的key和value的类型

  3. Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)
    设置合理的排序,对于reduce来说,reduce之间应该是有序的,reduce内的数据也应该是有序的
    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);保证reduce之间有序,而且数据不会重叠
    job.setReducerClass(KeyValueSortReducer.class);保证reduce内部有序
    类的介绍中给出了两个排序class,分别是KeyValueSortReducer or PutSortReducer,根据自己的代码选择具体用哪个

  4. The user should be sure to set the mapy output value class to either KeyValue or Put before runningthis function.
    Map输出的value数据类型应该是 KeyValue 类型 或者 Put 类型

设置合适的输出的key和value的类型,因为Bulk Loading是从hdfs中读取数据,所以Mapper的输入key和value的类型是,LongWritable和Text。而Mapper的输出key和value则与以前不一样。输出的key应该是rowkey类型,也即ImmutableBytesWritable类型,而输出的value可以是KeyValue类型或者Put类型。

输出的key是rowkey类型,是为了方便排序与分区,因为MapTask输出的数据会经过分区排序的过程后再发送给reduce端,为了保证reduce端处理的数据不能重复,而且有序,因此使用rowkey作为Mapper的输出key
假设要写入的表总共有3个region,那么就应该有3个reduce,而这3个reduce处理的数据应该是rowkey0-10,rowkey11-20,rowkey21-30这种,保证每个reduce处理的数据都不会相同。

而对于输出的value,KeyValue类型的含义是一个单元格cell的值,Put类型的含义是一条数据的值。根据需要选择不同的输出类型。

具体实现:

package Demo.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class bulkLoad {
    public static class BulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            String mdn = split[0];
            String start_time = split[1];
            //经度
            String longitude = split[4];
            //纬度
            String latitude = split[5];
            //设置要写入表中的rowkey值
            String rowkey = mdn+"_"+start_time;
            //设置要写入表中的经纬度的值,由于KeyValue传入的是单元格cell的值
            //所以通过rowkey+列簇+列名的形式确定一个单元格,然后传入具体的单元格里面的值
            KeyValue lgKV = new KeyValue(rowkey.getBytes(),"info".getBytes(),"lg".getBytes(),longitude.getBytes());
            KeyValue latKV = new KeyValue(rowkey.getBytes(),"info".getBytes(),"lat".getBytes(),latitude.getBytes());

            context.write(new ImmutableBytesWritable(rowkey.getBytes()),latKV);
            context.write(new ImmutableBytesWritable(rowkey.getBytes()),latKV);

        }
    }

    //因为这里没有计算的需求,所以Reducer的代码可以省略不写

    //Driver端
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");

        Job job = Job.getInstance(conf);
        job.setJobName("bulkLoad");
        job.setJarByClass(bulkLoad.class);

        //此时不能再在Driver端设置reduce的数量,因为reduce的数量与表里面的region数量一致
        //即使在这里写了设置语句,也不会生效

        //配置Map
        job.setMapperClass(BulkLoadMapper.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);

        //保证不同的reduce处理的数据不会有重叠,并且reduce之间是有序的
        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);

        //配置reduce
        //保证在Reduce内部的数据之间是有序的
        job.setReducerClass(KeyValueSortReducer.class);

        //配置输入输出路径
        FileInputFormat.addInputPath(job,new Path("/bulk_load/input"));
        Path outputPath = new Path("/bulk_load/output");
        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(outputPath)){
            fileSystem.delete(outputPath,true);
        }
        FileOutputFormat.setOutputPath(job,outputPath);

        //配置Hfile文件的生成与加载入库Hbase
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
        //获取RegionLocator对象
        //因为不同的HFile文件属于不同的region,RegionLocator对象就是用来告诉HFile文件应该去哪个Region的
        RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));

        //第一步,生成HFile
        // 使用HFileOutputFormat2将输出的数据格式化为HFile文件
        //这个方法需要传入三个参数,分别是job,表名,RegionLocator对象
        HFileOutputFormat2.configureIncrementalLoad(
                job,
                dianxin_bulk,
                regionLocator
        );

        //等待第一步的HFile文件写入完成,因为调用MapReduce任务是为了生成HFile文件
        //所以第二步加载数据,应该在job.waitForCompletion之后,即任务完成后再加载HFile文件入库
        boolean flag = job.waitForCompletion(true);

        if(flag){
            //第二步,加载HFile到HBase中
            LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
            //需要传入方法的参数依次是输出路径,Admin对象,表名,RegionLocator对象
            load.doBulkLoad(
                    outputPath,
                    admin,
                    dianxin_bulk,
                    regionLocator
            );
        }else{
            System.out.println("MapReduce任务运行失败");
        }

    }
}

执行程序之前,先到HBase中创建一张预分区的表

hbase(main):001:0> create 'dianxin_bulk','info',{SPLITS=>['1|','3|','5|','7|','9|','B|','D|']}
0 row(s) in 5.4560 seconds
=> Hbase::Table - dianxin_bulk

然后在hdfs中创建input文件,然后将数据集传入input文件夹中

hdfs dfs -mkdir -p /bulk_load/input
hdfs dfs -put /usr/local/data/DIANXIN.csv /bulk_load/input

然后将项目文件打包传到linux本地
运行jar包里面的指定程序

hadoop jar hdfs-1.0-SNAPSHOT-jar-with-dependencies.jar Demo.hbase.bulkLoad

程序运行结束后,到可视化页面查看一下
HBase快速导入巨量数据—— Bulk Loading
补充:

MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。

上一篇:HBase深入----读写流程


下一篇:Hbase总结