1、MapReduce可以操作HBase,通过Java写Mapreduce,打包在Hadoop上运行
每个map对应一个region,不能直接对hdfs切片,部分数据在memstore中,需要全表扫描,使用scan来获取数据 k:row key v:result一条数据的所有信息
不能使用TextInputforamt读取数据,只能使用TableInputFormat连接得到数据
数据写hdfs上使用TextOutputformat,写回hbase就是用Tableoutputformat
2、导包
org.apache.hbase hbase-server 1.4.6 导入插件,不导插件,Java写的Mr无法运行,识别不出来pox.xml里插件文件,直接读取Hbase数据 <!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 带依赖jar 插件-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3、示例
统计各个班级人数,数据写到hdfs
//3、设置map任务,使用TableMapReduceUtil工具类
//因为输入的数据是hbase的,需要配置扫描表,字段等信息
//new Scan 还可以加过滤条件等信息
//求每个班级的人数,结果输出到hdfs上面
public class Demo01MrRead {
/**
* map类,继承TableMapper,两个参数就可以了
*/
public static class MrMap extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//参数为:key:row key value:每一条数据结果
String row_key = Bytes.toString(key.get());
String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
context.write(new Text(clazz), new IntWritable(1));
}
}
/**
* reduce类
*/
public static class MrReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//参数与mapreduce一样了,map阶段输入kv,最终输出kv
//key:map阶段传入的key-calzz value:合并的班级人数
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
/**
* 驱动
*/
public static void main(String[] args) throws Exception {
//也可以直接new Configuration
Configuration conf = HBaseConfiguration.create();
//也可以不设置
//1、创建一个Job
Job job = Job.getInstance(conf);
job.setJobName("Hbase_mr1学生班级人数");
//2、设置Job的Jar
job.setJarByClass(Demo01MrRead.class);
//3、设置map任务,使用TableMapReduceUtil工具类
//因为输入的数据是hbase的,需要配置扫描表,字段等信息
//new Scan 还可以加过滤条件等信息
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students"),
new Scan(), MrMap.class, Text.class, IntWritable.class, job);
//4、设置reduce任务
job.setReducerClass(MrReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//5、设置输入输出路径
FileOutputFormat.setOutputPath(job, new Path("/hbaseMr/clazz_num"));
//6、执行
job.waitForCompletion(true);
}
}
统计各个班级人数,数据写回hbase–建表–使用TablemapreduceUtil指定输出路径为Hbase表
//统计每个班级人数,结果存入hbase表中
public class Demo02MrRead {
public static class MrMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
context.write(new Text(clazz), new IntWritable(1));
}
}
public static class MrReduce extends TableReducer<Text, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//以班级key作为row key
Put put = new Put(key.getBytes());
put.addColumn("info".getBytes(), "num".getBytes(), Bytes.toBytes(count));
context.write(NullWritable.get(), put);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");
Job job = Job.getInstance(conf);
job.setJarByClass(Demo02MrRead.class);
TableMapReduceUtil.initTableMapperJob("students",
new Scan(), MrMapper.class,
Text.class,
IntWritable.class,
job);
TableMapReduceUtil.initTableReducerJob(
"mr_res",
MrReduce.class,
job
);
job.waitForCompletion(true);
}
}