2.8-2.10 HBase集成MapReduce

一、HBase集成MapReduce

1、查看HBase集成MapReduce需要的jar包

[root@hadoop-senior hbase-0.98.6-hadoop2]# bin/hbase mapredcp
2019-05-22 16:23:46,814 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-common-0.98.6-hadoop2.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/protobuf-java-2.5.0.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-client-0.98.6-hadoop2.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-protocol-0.98.6-hadoop2.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/high-scale-lib-1.1.1.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/zookeeper-3.4.5.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/guava-12.0.1.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/htrace-core-2.04.jar:
/opt/modules/hbase-0.98.6-hadoop2/lib/netty-3.6.6.Final.jar


2、

##开启yarn
[root@hadoop-senior hadoop-2.5.0]# sbin/yarn-daemon.sh start nodemanager
[root@hadoop-senior hadoop-2.5.0]# sbin/mr-jobhistory-daemon.sh start histryserver
[root@hadoop-senior hadoop-2.5.0]# sbin/mr-jobhistory-daemon.sh start historyserver



##HBase默认带的MapReduce程序都在hbase-server-0.98.6-hadoop2.jar里面,比较有用

[root@hadoop-senior hbase-0.98.6-hadoop2]# export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
[root@hadoop-senior hbase-0.98.6-hadoop2]# export HADOOP_HOME=/opt/modules/hadoop-2.5.0
[root@hadoop-senior hbase-0.98.6-hadoop2]# HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar

An example program must be given as the first argument.
Valid program names are:
  CellCounter: Count cells in HBase table
  completebulkload: Complete a bulk data load.
  copytable: Export a table from local cluster to peer cluster
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table
  verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.



#####
TSV
    tab分割
    >>student.tsv
    1001 zhangsan 26 shanghai 

CSV
    逗号分割
    >>student.csv
    1001,zhangsan,26,shanghai


二、编写MapReduce程序,集成HBase对表进行读取和写入数据

1、准备数据

##准备两张表,user:里面有数据,basic:没有数据
hbase(main):004:0> create 'basic', 'info'
0 row(s) in 0.4290 seconds
=> Hbase::Table – basic


hbase(main):005:0> list
TABLE                                                                                                                                                                       
sub_user                                                                                                                                                                    
user                                                                                                                                                                        
2 row(s) in 0.0290 seconds
=> ["basic", "user"]


hbase(main):003:0> scan 'user'
ROW                                          COLUMN+CELL                                                                                                                    
 10002                                       column=info:age, timestamp=1558343570256, value=30                                                                             
 10002                                       column=info:name, timestamp=1558343559457, value=wangwu                                                                        
 10002                                       column=info:qq, timestamp=1558343612746, value=231294737                                                                       
 10002                                       column=info:tel, timestamp=1558343607851, value=231294737                                                                      
 10003                                       column=info:age, timestamp=1558577830484, value=35                                                                             
 10003                                       column=info:name, timestamp=1558345826709, value=zhaoliu                                                                       
 10004                                       column=info:address, timestamp=1558505387829, value=shanghai                                                                   
 10004                                       column=info:age, timestamp=1558505387829, value=25                                                                             
 10004                                       column=info:name, timestamp=1558505387829, value=zhaoliu                                                                       
3 row(s) in 0.0190 seconds


hbase(main):006:0> scan 'basic'
ROW                                          COLUMN+CELL                                                                                                                    
0 row(s) in 0.0100 seconds


2、编写MapReduce,将user表中的数据导入到basic表中

package com.beifeng.senior.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class User2BasicMapReduce extends Configured implements Tool {
    
    // Mapper Class
    public static class ReadUserMapper extends TableMapper<Text, Put> {

        private Text mapOutputKey = new Text();

        @Override
        public void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
                        throws IOException, InterruptedException {
            // get rowkey
            String rowkey = Bytes.toString(key.get());

            // set
            mapOutputKey.set(rowkey);

            // --------------------------------------------------------
            Put put = new Put(key.get());

            // iterator
            for (Cell cell : value.rawCells()) {
                // add family : info
                if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                    // add column: name
                    if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                        put.add(cell);
                    }
                    // add column : age
                    if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                        put.add(cell);
                    }
                }
            }

            // context write
            context.write(mapOutputKey, put);
        }

    }

    // Reducer Class
    public static class WriteBasicReducer extends TableReducer<Text, Put, //
    ImmutableBytesWritable> {

        @Override
        public void reduce(Text key, Iterable<Put> values,
                Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
                        throws IOException, InterruptedException {
            for(Put put: values){
                context.write(null, put);
            }
        }

    }

    // Driver
    public int run(String[] args) throws Exception {
        
        // create job
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
        
        // set run job class
        job.setJarByClass(this.getClass());
        
        // set job
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs
        // set other scan attrs

        // set input and set mapper
        TableMapReduceUtil.initTableMapperJob(
          "user",        // input table
          scan,               // Scan instance to control CF and attribute selection
          ReadUserMapper.class,     // mapper class
          Text.class,         // mapper output key
          Put.class,  // mapper output value
          job //
         );
        
        // set reducer and output
        TableMapReduceUtil.initTableReducerJob(
          "basic",        // output table
          WriteBasicReducer.class,    // reducer class
          job//
         );
        
        job.setNumReduceTasks(1);   // at least one, adjust as required
        
        // submit job
        boolean isSuccess = job.waitForCompletion(true) ;
        
        
        return isSuccess ? 0 : 1;
    }
    
    
    public static void main(String[] args) throws Exception {
        // get configuration
        Configuration configuration = HBaseConfiguration.create();
        
        // submit job
        int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ;
        
        // exit program
        System.exit(status);
    }

}


3、执行

##打jar包,并上传到$HADOOP_HOME/jars/

##执行
export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar


##查看执行结果
hbase(main):004:0> scan 'basic'
ROW                                          COLUMN+CELL                                                                                                                    
 10002                                       column=info:age, timestamp=1558343570256, value=30                                                                             
 10002                                       column=info:name, timestamp=1558343559457, value=wangwu                                                                        
 10003                                       column=info:age, timestamp=1558577830484, value=35                                                                             
 10003                                       column=info:name, timestamp=1558345826709, value=zhaoliu                                                                       
 10004                                       column=info:age, timestamp=1558505387829, value=25                                                                             
 10004                                       column=info:name, timestamp=1558505387829, value=zhaoliu                                                                       
3 row(s) in 0.0300 seconds
上一篇:好程序员大数据教程分享之Hadoop优缺点


下一篇:好程序员大数据入门学习之Hadoop技术优缺点