(转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

多个mapreduce工作相互依赖处理方法完整实例(JobControl)

原文地址:http://mntms.iteye.com/blog/2096456?utm_source=tuicool&utm_medium=referral

处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。

代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释

先是代码的主体部分:

(转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

上代码:

  1. /*
  2. * anthor TMS
  3. */
  4. package 依赖MR处理方法;
  5. import java.io.IOException;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
  15. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. public class MODEL {
  18. //第一个Job的map函数
  19. public static class Map_First extends Mapper<Object, Text  ,Text , IntWritable>{                                                                                                     private final static IntWritable one = new IntWritable(1);
  20. private Text keys = new Text();
  21. public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {
  22. String s = value.toString();
  23. String[]  allStr = Config.CatString(s);
  24. keys.set(allStr[1]);
  25. context.write(keys, one);
  26. }
  27. }
  28. //第一个Job的reduce函数
  29. public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> {
  30. private IntWritable result = new IntWritable();
  31. public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
  32. int sum = 0;
  33. for(IntWritable value:values) {
  34. sum  +=  value.get();
  35. }
  36. result.set(sum);
  37. context.write(key, result);
  38. }
  39. }
  40. //第二个job的map函数
  41. public static class Map_Second extends Mapper<Object, Text  ,Text , IntWritable>{
  42. private final static IntWritable one = new IntWritable(1);
  43. private Text keys = new Text();
  44. public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {
  45. String s = value.toString();
  46. String[]  allStr = Config.CatString(s);
  47. keys.set(allStr[1]);
  48. context.write(keys, one);
  49. }
  50. }
  51. //第二个Job的reduce函数
  52. public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> {
  53. private IntWritable result = new IntWritable();
  54. public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
  55. int sum = 0;
  56. for(IntWritable value:values) {
  57. sum  +=  value.get();
  58. }
  59. result.set(sum);
  60. context.write(key, result);
  61. }
  62. }
  63. //启动函数
  64. public static void main(String[] args) throws IOException {
  65. JobConf conf = new JobConf(MODEL.class);
  66. //第一个job的配置
  67. Job job1 = new Job(conf,"join1");
  68. job1.setJarByClass(MODEL.class);
  69. job1.setMapperClass(Map_First.class);
  70. job1.setReducerClass(Reduce_First.class);
  71. job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key
  72. job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value
  73. job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key
  74. job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value
  75. //加入控制容器
  76. ControlledJob ctrljob1=new  ControlledJob(conf);
  77. ctrljob1.setJob(job1);
  78. //job1的输入输出文件路径
  79. FileInputFormat.addInputPath(job1, new Path(args[0]));
  80. FileOutputFormat.setOutputPath(job1, new Path(args[1]));
  81. //第二个job的配置
  82. Job job2=new Job(conf,"Join2");
  83. job2.setJarByClass(MODEL.class);
  84. job2.setMapperClass(Map_Second.class);
  85. job2.setReducerClass(Reduce_Second.class);
  86. job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key
  87. job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value
  88. job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key
  89. job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value
  90. //作业2加入控制容器
  91. ControlledJob ctrljob2=new ControlledJob(conf);
  92. ctrljob2.setJob(job2);
  93. //设置多个作业直接的依赖关系
  94. //如下所写:
  95. //意思为job2的启动,依赖于job1作业的完成
  96. ctrljob2.addDependingJob(ctrljob1);
  97. //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
  98. FileInputFormat.addInputPath(job2, new Path(args[1]));
  99. //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
  100. //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
  101. FileOutputFormat.setOutputPath(job2,new Path(args[2]) );
  102. //主的控制容器,控制上面的总的两个子作业
  103. JobControl jobCtrl=new JobControl("myctrl");
  104. //添加到总的JobControl里,进行控制
  105. jobCtrl.addJob(ctrljob1);
  106. jobCtrl.addJob(ctrljob2);
  107. //在线程启动,记住一定要有这个
  108. Thread  t=new Thread(jobCtrl);
  109. t.start();
  110. while(true){
  111. if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息
  112. System.out.println(jobCtrl.getSuccessfulJobList());
  113. jobCtrl.stop();
  114. break;
  115. }
  116. }
  117. }
  118. }

工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名)

(转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

接下来是arguments如下所示:

(转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1

(转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!

上一篇:jQuery省市区三级联动菜单


下一篇:maven配置阿里镜像仓库