hadoop2的mapreduce操作hbase数据

1、从hbase中取数据,再把计算结果插入hbase中

package com.yeliang;
import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; /**
* 从hbase里取数据,分析完成后插入到hbase里
* @author liang.ye
*
*/
public class FamilyHBase { public static class Map extends TableMapper<Text, IntWritable>{ @Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
for (Cell cell : value.rawCells())
{
if(new String(CellUtil.cloneQualifier(cell)).equals("GroupID")){
context.write(new Text(new String(CellUtil.cloneValue(cell))), new IntWritable(1));
}
}
}
} public static class Reduce extends
TableReducer<Text, IntWritable, NullWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
byte[] keyBytes = Bytes.toBytes(key.toString());
if(keyBytes.length>0){
Put put = new Put(keyBytes);
// Put实例化,每一个词存一行
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));
// 列族为content,列为count,列值为数目
context.write(NullWritable.get(), put);
}
}
} public static void createHBaseTable(String tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.56.101,192.168.56.102,192.168.56.103");
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table exists, trying to recreate table......");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
System.out.println("create new table:" + tableName);
admin.createTable(htd);
} public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
String tableName = "family_sum_by_groupid";
Configuration conf = HBaseConfiguration.create();
//conf.set("hbase.zookeeper.quorum","192.168.56.101,192.168.56.102,192.168.56.103");
createHBaseTable(tableName);
Job job = new Job(conf, "family_sum_by_groupid ");
job.setJarByClass(FamilyHBase.class);
Scan scan = new Scan();
scan.addFamily("cf".getBytes());
TableMapReduceUtil.initTableMapperJob("family3", scan, Map.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob(tableName, Reduce.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

  2、从hdfs中取数据,把计算的结果插入到hdfs中

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; /**
* 从hdfs中取数分析,然后插入到hbase里
* @author liang.ye
*
*/
public class WordCountHBase { public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable i = new IntWritable(1); public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s[] = value.toString().trim().split(" ");
// 将输入的每行以空格分开
for (String m : s) {
context.write(new Text(m), i);
}
}
} public static class Reduce extends
TableReducer<Text, IntWritable, NullWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
byte[] keyBytes = Bytes.toBytes(key.toString());
if(keyBytes.length>0){
Put put = new Put(keyBytes);
// Put实例化,每一个词存一行
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));
// 列族为content,列为count,列值为数目
context.write(NullWritable.get(), put);
}
}
} public static void createHBaseTable(String tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.56.101,192.168.56.102,192.168.56.103");
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table exists, trying to recreate table......");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
System.out.println("create new table:" + tableName);
admin.createTable(htd);
} public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
String tableName = "WordCount2";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
createHBaseTable(tableName);
String input = args[0];
Job job = new Job(conf, "WordCount table with " + input);
job.setJarByClass(WordCountHBase.class);
job.setNumReduceTasks(3);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

  

上一篇:【Java基础】14、位运算之——按位与(&)操作——(快速取模算法)


下一篇:关于