hbase 从hdfs上读取数据到hbase中

 <dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>

Mappper

 package cn.hbase.mapreduce.hdfs;

 import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; /**
*
* @author Tele 输入key hdfs上的文本的行号 输入value 文本 输出key 行键 输出value 将插入hbase的一行数据,需要行键
*
*/ public class ReadFruitFromHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取
String line = value.toString(); // 切割
/**
* 1001 apple red 1002 pear yellow 1003 pineapple yellow
*/
String[] fields = line.split("\t"); // 每个列族对应多个列
Map<String, Object> map = new HashMap<String, Object>(); // 封装列族下需要的列
List<String> infoCNList = new ArrayList<String>();
infoCNList.add("name");// 值对应field[1]
infoCNList.add("color");// 值对应field[2]
map.put("info", infoCNList); String row = fields[0]; // 封装
Put put = new Put(Bytes.toBytes(row)); // 遍历map,封装每个列族下的列
Set<Entry<String, Object>> entrySet = map.entrySet();
Iterator<Entry<String, Object>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Entry<String, Object> entry = iterator.next();
String cf = entry.getKey();
List<String> cnList = (List<String>) entry.getValue(); // 遍历list
for (int i = 0; i < cnList.size(); i++) {
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cnList.get(i)), Bytes.toBytes(fields[i + 1]));
}
} // 行键
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(fields[0])); // 写出
context.write(immutableBytesWritable, put); } }

Reducer

 package cn.hbase.mapreduce.hdfs;

 import java.io.IOException;

 import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer; /**
*
*@author Tele
*
*对hbase上的表操作,继承tablereducer即可
*
*/ public class WriteFruitReducer extends TableReducer<ImmutableBytesWritable,Put,NullWritable> { @Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> value,Context context) throws IOException, InterruptedException {
for (Put put : value) {
context.write(NullWritable.get(), put);
}
} }

Runner

 package cn.hbase.mapreduce.hdfs;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; /**
*
* @author Tele
*
*/ public class FruitRunner extends Configured implements Tool { public int run(String[] args) throws Exception {
// 实例化job
Job job = Job.getInstance(this.getConf()); // 设置jar包路径
job.setJarByClass(FruitRunner.class); // 组装mapper
job.setMapperClass(ReadFruitFromHdfsMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class); // 设置数据来源
FileInputFormat.addInputPath(job, new Path("/input_fruit")); // 组装reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitReducer.class, job); // 设置reduce个数
job.setNumReduceTasks(1); // 提交 return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
ToolRunner.run(new FruitRunner(), args); } }

ps:需要预先创建表

上一篇:hive的数据导入与数据导出:(本地,云hdfs,hbase),列分隔符的设置,以及hdfs上传给pig如何处理


下一篇:Sublime Text 3常用插件—Emmet