一、HBase集成MapReduce
1、查看HBase集成MapReduce需要的jar包
[root@hadoop-senior hbase-0.98.6-hadoop2]# bin/hbase mapredcp 2019-05-22 16:23:46,814 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-common-0.98.6-hadoop2.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/protobuf-java-2.5.0.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-client-0.98.6-hadoop2.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/hbase-protocol-0.98.6-hadoop2.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/high-scale-lib-1.1.1.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/zookeeper-3.4.5.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/guava-12.0.1.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/htrace-core-2.04.jar: /opt/modules/hbase-0.98.6-hadoop2/lib/netty-3.6.6.Final.jar
2、
##开启yarn [root@hadoop-senior hadoop-2.5.0]# sbin/yarn-daemon.sh start nodemanager [root@hadoop-senior hadoop-2.5.0]# sbin/mr-jobhistory-daemon.sh start histryserver [root@hadoop-senior hadoop-2.5.0]# sbin/mr-jobhistory-daemon.sh start historyserver ##HBase默认带的MapReduce程序都在hbase-server-0.98.6-hadoop2.jar里面,比较有用 [root@hadoop-senior hbase-0.98.6-hadoop2]# export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2 [root@hadoop-senior hbase-0.98.6-hadoop2]# export HADOOP_HOME=/opt/modules/hadoop-2.5.0 [root@hadoop-senior hbase-0.98.6-hadoop2]# HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar An example program must be given as the first argument. Valid program names are: CellCounter: Count cells in HBase table completebulkload: Complete a bulk data load. copytable: Export a table from local cluster to peer cluster export: Write table data to HDFS. import: Import data written by Export. importtsv: Import data in TSV format. rowcounter: Count rows in HBase table verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log. ##### TSV tab分割 >>student.tsv 1001 zhangsan 26 shanghai CSV 逗号分割 >>student.csv 1001,zhangsan,26,shanghai
二、编写MapReduce程序,集成HBase对表进行读取和写入数据
1、准备数据
##准备两张表,user:里面有数据,basic:没有数据 hbase(main):004:0> create 'basic', 'info' 0 row(s) in 0.4290 seconds => Hbase::Table – basic hbase(main):005:0> list TABLE sub_user user 2 row(s) in 0.0290 seconds => ["basic", "user"] hbase(main):003:0> scan 'user' ROW COLUMN+CELL 10002 column=info:age, timestamp=1558343570256, value=30 10002 column=info:name, timestamp=1558343559457, value=wangwu 10002 column=info:qq, timestamp=1558343612746, value=231294737 10002 column=info:tel, timestamp=1558343607851, value=231294737 10003 column=info:age, timestamp=1558577830484, value=35 10003 column=info:name, timestamp=1558345826709, value=zhaoliu 10004 column=info:address, timestamp=1558505387829, value=shanghai 10004 column=info:age, timestamp=1558505387829, value=25 10004 column=info:name, timestamp=1558505387829, value=zhaoliu 3 row(s) in 0.0190 seconds hbase(main):006:0> scan 'basic' ROW COLUMN+CELL 0 row(s) in 0.0100 seconds
2、编写MapReduce,将user表中的数据导入到basic表中
package com.beifeng.senior.hadoop.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class User2BasicMapReduce extends Configured implements Tool { // Mapper Class public static class ReadUserMapper extends TableMapper<Text, Put> { private Text mapOutputKey = new Text(); @Override public void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context) throws IOException, InterruptedException { // get rowkey String rowkey = Bytes.toString(key.get()); // set mapOutputKey.set(rowkey); // -------------------------------------------------------- Put put = new Put(key.get()); // iterator for (Cell cell : value.rawCells()) { // add family : info if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) { // add column: name if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } // add column : age if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } } } // context write context.write(mapOutputKey, put); } } // Reducer Class public static class WriteBasicReducer extends TableReducer<Text, Put, // ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<Put> values, Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { for(Put put: values){ context.write(null, put); } } } // Driver public int run(String[] args) throws Exception { // create job Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); // set run job class job.setJarByClass(this.getClass()); // set job Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs // set input and set mapper TableMapReduceUtil.initTableMapperJob( "user", // input table scan, // Scan instance to control CF and attribute selection ReadUserMapper.class, // mapper class Text.class, // mapper output key Put.class, // mapper output value job // ); // set reducer and output TableMapReduceUtil.initTableReducerJob( "basic", // output table WriteBasicReducer.class, // reducer class job// ); job.setNumReduceTasks(1); // at least one, adjust as required // submit job boolean isSuccess = job.waitForCompletion(true) ; return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { // get configuration Configuration configuration = HBaseConfiguration.create(); // submit job int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ; // exit program System.exit(status); } }
3、执行
##打jar包,并上传到$HADOOP_HOME/jars/ ##执行 export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2 export HADOOP_HOME=/opt/modules/hadoop-2.5.0 HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar ##查看执行结果 hbase(main):004:0> scan 'basic' ROW COLUMN+CELL 10002 column=info:age, timestamp=1558343570256, value=30 10002 column=info:name, timestamp=1558343559457, value=wangwu 10003 column=info:age, timestamp=1558577830484, value=35 10003 column=info:name, timestamp=1558345826709, value=zhaoliu 10004 column=info:age, timestamp=1558505387829, value=25 10004 column=info:name, timestamp=1558505387829, value=zhaoliu 3 row(s) in 0.0300 seconds