6.1.8、Hbase__Mapreduce操作Hbase,导包,插件,使用TableMapper,TableReducer,设置map任务,使用TableMapReduceUtil工具类

1、MapReduce可以操作HBase,通过Java写Mapreduce,打包在Hadoop上运行

6.1.8、Hbase__Mapreduce操作Hbase,导包,插件,使用TableMapper,TableReducer,设置map任务,使用TableMapReduceUtil工具类

每个map对应一个region,不能直接对hdfs切片,部分数据在memstore中,需要全表扫描,使用scan来获取数据 k:row key v:result一条数据的所有信息
不能使用TextInputforamt读取数据,只能使用TableInputFormat连接得到数据
数据写hdfs上使用TextOutputformat,写回hbase就是用Tableoutputformat

2、导包

org.apache.hbase hbase-server 1.4.6 导入插件,不导插件,Java写的Mr无法运行,识别不出来pox.xml里插件文件,直接读取Hbase数据
        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>


        <!-- 带依赖jar 插件-->
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

3、示例

统计各个班级人数,数据写到hdfs
//3、设置map任务,使用TableMapReduceUtil工具类
//因为输入的数据是hbase的,需要配置扫描表,字段等信息
//new Scan 还可以加过滤条件等信息

//求每个班级的人数,结果输出到hdfs上面
public class Demo01MrRead {
    /**
     * map类,继承TableMapper,两个参数就可以了
     */
    public static class MrMap extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //参数为:key:row key   value:每一条数据结果
            String row_key = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    /**
     * reduce类
     */
    public static class MrReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //参数与mapreduce一样了,map阶段输入kv,最终输出kv
            //key:map阶段传入的key-calzz value:合并的班级人数
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    /**
     * 驱动
     */
    public static void main(String[] args) throws Exception {
        //也可以直接new Configuration
        Configuration conf = HBaseConfiguration.create();
        //也可以不设置
        //1、创建一个Job
        Job job = Job.getInstance(conf);
        job.setJobName("Hbase_mr1学生班级人数");
        //2、设置Job的Jar
        job.setJarByClass(Demo01MrRead.class);
        //3、设置map任务,使用TableMapReduceUtil工具类
        //因为输入的数据是hbase的,需要配置扫描表,字段等信息
        //new Scan 还可以加过滤条件等信息
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students"),
                new Scan(), MrMap.class, Text.class, IntWritable.class, job);
        //4、设置reduce任务
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5、设置输入输出路径
        FileOutputFormat.setOutputPath(job, new Path("/hbaseMr/clazz_num"));
        //6、执行
        job.waitForCompletion(true);
    }
}

统计各个班级人数,数据写回hbase–建表–使用TablemapreduceUtil指定输出路径为Hbase表

//统计每个班级人数,结果存入hbase表中
public class Demo02MrRead {
    public static class MrMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    public static class MrReduce extends TableReducer<Text, IntWritable, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            //以班级key作为row key
            Put put = new Put(key.getBytes());
            put.addColumn("info".getBytes(), "num".getBytes(), Bytes.toBytes(count));
            context.write(NullWritable.get(), put);
        }
    }

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");
            Job job = Job.getInstance(conf);
            job.setJarByClass(Demo02MrRead.class);

            TableMapReduceUtil.initTableMapperJob("students",
                    new Scan(), MrMapper.class,
                    Text.class,
                    IntWritable.class,
                    job);


            TableMapReduceUtil.initTableReducerJob(
                    "mr_res",
                    MrReduce.class,
                    job
            );

            job.waitForCompletion(true);
        }
    }
上一篇:hbase出现 Master is initializing的处理方式


下一篇:记录一次hbase宕机之后,对问题的排查