hadoop执行hdfs文件到hbase表插入操作(xjl456852原创)

本例中需要将hdfs上的文本文件,解析后插入到hbase的表中.

本例用到的hadoop版本2.7.2 hbase版本1.2.2

hbase的表如下:
  1. create 'ns2:user', 'info'
hdfs上的文本文件如下[data/hbase_input/hbase.txt]
  1. 1,xiejl,20
  2. 2,haha,30
  3. 3,liudehua,40
  4. 4,daoming,41
可以通过命令查看hadoop的classpath现在包含哪些jar包:
  1. [hadoop@master ~]$ hdfs classpath

java的主方法:
  1. package com.xjl456852.mapreduce;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.hbase.client.Put;
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  5. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import java.io.IOException;
  10. /**
  11. * 将hdfs中的文本文件写入到hbase的表中
  12. * 程序的运行需要加入hadoop的配置文件和hbase的配置文件到jar包中
  13. * 对应的hbase的表
  14. * create 'ns2:user','info'
  15. *
  16. * Created by xiejl on 2016/8/10.
  17. */
  18. public class HBaseApp {
  19. public static void main(String [] args) {
  20. try {
  21. Job job = Job.getInstance();
  22. job.setJobName("text into hbase table");
  23. job.setJarByClass(HBaseApp.class);
  24. FileInputFormat.addInputPath(job, new Path(args[0]));
  25. //设置表名
  26. job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, args[1]);
  27. //设置输出格式为table
  28. job.setOutputFormatClass(TableOutputFormat.class);
  29. //设置输出的key类型为ImmutableBytesWritable
  30. job.setOutputKeyClass(ImmutableBytesWritable.class);
  31. //设置输出的value类型为Put
  32. job.setOutputValueClass(Put.class);
  33. //因为map输出key和reduce输出的key类型不一致,所以需要再设置map的key输出类型为Text
  34. job.setMapOutputKeyClass(Text.class);
  35. //因为map输出value和reduce输出的value类型不一致,所以需要再设置map的value输出类型为Text
  36. job.setMapOutputValueClass(Text.class);
  37. //Mapper
  38. job.setMapperClass(MyMapper.class);
  39. //Reducer
  40. job.setReducerClass(MyReducer.class);
  41. System.exit(job.waitForCompletion(true) ? 0 : 1);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. } catch (ClassNotFoundException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }
Mapper类:
  1. package com.xjl456852.mapreduce;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * Created by xiejl on 2016/8/10.
  8. */
  9. public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
  10. @Override
  11. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  12. String line = value.toString();
  13. int index = line.indexOf(",");
  14. String rowKey = line.substring(0, index);
  15. //跳过逗号
  16. String valueLine = line.substring(index+1);
  17. context.write(new Text(rowKey), new Text(valueLine));
  18. }
  19. }
Reducer类:
  1. package com.xjl456852.mapreduce;
  2. import org.apache.hadoop.hbase.client.Put;
  3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. import java.io.IOException;
  8. /**
  9. * Created by xiejl on 2016/8/11.
  10. */
  11. public class MyReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
  12. @Override
  13. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  14. byte[] rowKey = Bytes.toBytes(key.toString());
  15. for(Text text : values) {
  16. //设置put对象的行键
  17. Put put = new Put(rowKey);
  18. String line = text.toString();
  19. int index = line.indexOf(",");
  20. String name = line.substring(0, index);
  21. String age = line.substring(index+1);
  22. //列族的是建表时固定的,列和值是插入时添加的.
  23. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),Bytes.toBytes(name));
  24. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"),Bytes.toBytes(age));
  25. context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
  26. }
  27. }
  28. }
将程序打为jar包,jar包中需要加入hadoop的配置文件和hbase的配置 (有人将程序打成胖包,就是将依赖的jar,依赖的四个类库hbase-client,hbase-server,hbase-common,hbsae-protocol放入lib目录中,我试验不行,会出现map和reduce任务都执行到100%时卡住不动,等十分钟又出现 FAILED AttemptID:attempt_xxx Timed out after 600 secs,然后又重新执行mapreduce任务,然后又卡住,得结束掉mapreduce进程才能终止)

需要修改集群的配置文件,以满足hadoop执行hbase表插入操作时,能找到相关的类库.

将HBase的类jar包加到hadoop的classpath下, 修改${HADOOP_HOME}/etc/hadoop/hadoop-env.sh。配置好这个文件,分发到各个节点,改这个配置不用重启集群.

  1. TEMP=`ls /opt/modules/hbase/lib/*.jar`
  2. HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
  3. HADOOP_CLASSPATH=$HBASE_JARS

如果现在运行程序还是会出错,详情可以看我的另一篇文章.hadoop执行hbase插入表操作,出错:Stack trace: ExitCodeException exitCode=1:(xjl456852原创)

还需要在${HADOOP_HOME}/etc/hadoop/yarn-site.xml中加入mapreduce运行时需要的类库,需要设置yarn.application.classpath:
所以我在yarn-site.xml中加入了如下配置,并加入了hbase的lib目录,配置好这个文件,分发到各个节点,这个配置需要重启集群
  1. <property>
  2. <name>yarn.application.classpath</name>
  3. <value>
  4. /opt/modules/hadoop/etc/*,
  5. /opt/modules/hadoop/etc/hadoop/*,
  6. /opt/modules/hadoop/lib/*,
  7. /opt/modules/hadoop/share/hadoop/common/*,
  8. /opt/modules/hadoop/share/hadoop/common/lib/*,
  9. /opt/modules/hadoop/share/hadoop/mapreduce/*,
  10. /opt/modules/hadoop/share/hadoop/mapreduce/lib/*,
  11. /opt/modules/hadoop/share/hadoop/hdfs/*,
  12. /opt/modules/hadoop/share/hadoop/hdfs/lib/*,
  13. /opt/modules/hadoop/share/hadoop/yarn/*,
  14. /opt/modules/hadoop/share/hadoop/yarn/lib/*,
  15. /opt/modules/hbase/lib/*
  16. </value>
  17. </property>

然后执行:
  1. hadoop jar hbase.jar com.xjl456852.mapreduce.HBaseApp data/hbase_input ns2:user
查看hbase的表内容:
  1. hbase(main):001:0> scan 'ns2:user'
  2. ROW COLUMN+CELL
  3. 1 column=info:age, timestamp=1470966325326, value=20
  4. 1 column=info:name, timestamp=1470966325326, value=xiejl
  5. 2 column=info:age, timestamp=1470966325326, value=30
  6. 2 column=info:name, timestamp=1470966325326, value=haha
  7. 3 column=info:age, timestamp=1470966325326, value=40
  8. 3 column=info:name, timestamp=1470966325326, value=liudehua
  9. 4 column=info:age, timestamp=1470966325326, value=41
  10. 4 column=info:name, timestamp=1470966325326, value=daoming
  11. 4 row(s) in 0.3100 seconds
可以看到数据已经插入到hbase表中.


还可以将Reducer类写成继承TableReducer方式,代码如下,执行后会有同样的结果:
  1. package com.xjl456852.mapreduce;
  2. import org.apache.hadoop.hbase.client.Put;
  3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  4. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import org.apache.hadoop.io.Text;
  7. import java.io.IOException;
  8. /**
  9. * 如果继承TableReducer,从源码中可以看到,输出的value是Mutation类型,也就是输出的值可以是Put,Delete之类的类型
  10. * Created by xiejl on 2016/8/11.
  11. */
  12. public class MyReducer2 extends TableReducer<Text, Text, ImmutableBytesWritable> {
  13. @Override
  14. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  15. byte[] rowKey = Bytes.toBytes(key.toString());
  16. for(Text text : values) {
  17. //设置put对象的行键
  18. Put put = new Put(rowKey);
  19. String line = text.toString();
  20. int index = line.indexOf(",");
  21. String name = line.substring(0, index);
  22. String age = line.substring(index+1);
  23. //列族的是建表时固定的,列和值是插入时添加的.
  24. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),Bytes.toBytes(name));
  25. put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"),Bytes.toBytes(age));
  26. context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
  27. }
  28. context.getCounter("reduce", "over").increment(1);
  29. }
  30. }


上一篇:[转]C#综合揭秘——细说进程、应用程序域与上下文之间的关系


下一篇:valuestack,stackContext,ActionContext.之间的关系以及如何存取数值的