MapReduce原理深入理解(二)

1.Mapreduce操作不需要reduce阶段

 

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.FileSystem;
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.NullWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 
12 import java.io.IOException;
13 
14 public class WordCount03 {
15     public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
16         @Override
17         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
18             String line = value.toString();
19             String s = line.split(",")[3];
20             if(s.equals("男")){
21                 context.write(new Text(s),NullWritable.get());
22             }
23         }
24     }
25     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
26         Job job= Job.getInstance();
27         job.setNumReduceTasks(0);
28         /**
29          * 有些情况下,不需要reduce(聚合程序),
30          * 在不需要聚合操作的时候,可以不需要reduce
31          * 而reduce默认为1,需要手动设置为0,
32          * 如果没有设置为0,会产生默认的reduce,只不过reduce不处理任何数据
33          */
34         job.setJobName("mr03程序");
35         job.setJarByClass(WordCount03.class);
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(NullWritable.class);
38         Path in = new Path("/word");
39         FileInputFormat.addInputPath(job,in);
40         Path out = new Path("/output");
41         FileSystem fs = FileSystem.get(new Configuration());
42         if(fs.exists(out)){
43             fs.delete(out);
44         }
45         FileOutputFormat.setOutputPath(job,out);
46         job.waitForCompletion(true);
47     }
48 }

 

注意:

有些情况下,不需要reduce(聚合程序),
在不需要聚合操作的时候,可以不需要reduce
而reduce默认为1,需要手动设置为0,
如果没有设置为0,会产生默认的reduce,只不过reduce不处理任何数据

2.MapReduce中join操作(数据拼接)
  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.NullWritable;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.InputSplit;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 
 15 import java.io.IOException;
 16 import java.util.ArrayList;
 17 
 18 public class WordCount04 {
 19     public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
 20         @Override
 21         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 22             //1.获取数据的路径 InputSplit
 23             //context 上面是hdfs 下面如果有reduce就是reduce 没有就是hdfs
 24             InputSplit inputSplit = context.getInputSplit();
 25             FileSplit fs=(FileSplit)inputSplit;
 26             String url = fs.getPath().toString();
 27             //2.判断
 28             if(url.contains("students")){//true当前数据为students.txt
 29                 String id = value.toString().split(",")[0];
 30                 //为了方便reduce数据的操作 针对于不同的数据 打一个标签
 31                 String line = "*" + value.toString();
 32                 context.write(new Text(id),new Text(line));
 33             }else {//false 当前数据为score.txt
 34                 //以学号作为k 也是两张数据的关联条件
 35                 String id = value.toString().split(",")[0];
 36                 //为了方便reduce数据的操作 针对于不同的数据 打一个标签
 37                 String line = "#" + value.toString();
 38                 context.write(new Text(id),new Text(line));
 39             }
 40         }
 41     }
 42     public static class JoinReduce extends Reducer<Text,Text,Text,NullWritable>{
 43         @Override
 44         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 45             //数据在循环之外保存
 46             String stuInfo="";
 47             ArrayList<String> scores = new ArrayList<String>();
 48             //提取数据
 49             for (Text value : values) {
 50                 //获取一行一行的数据(所有数据包含students.txt和score.txt)
 51                 String line = value.toString();
 52                 if(line.startsWith("*")){//true 为学生数据
 53                     stuInfo= line.substring(1);
 54                 }else {//false  为学生成绩数据
 55                     scores.add(line.substring(1));
 56                 }
 57             }
 58             /**
 59              * 求的是 两张表的拼接
 60              */
 61             //数据拼接
 62             for (String score : scores) {
 63                 String subject = score.split(",")[1];
 64                 String s = score.split(",")[2];
 65                 String end=stuInfo+","+subject+","+s;
 66                 context.write(new Text(end),NullWritable.get());
 67             }
 68             /**
 69              * 求的是 两张表的拼接 拼接过程中对成绩求和
 70              */
 71 //            long sum=0l;
 72 //            for (String s : scores) {
 73 //                Integer sc =Integer.valueOf( s.split(",")[2]);
 74 //                sum+=sc;
 75 //            }
 76 //            String end=stuInfo+","+sum;
 77 //            context.write(new Text(end),NullWritable.get());
 78         }
 79     }
 80     public static void main(String[] args) throws Exception {
 81         Job job = Job.getInstance();
 82         job.setJobName("Join MapReduce");
 83         job.setJarByClass(WordCount04.class);
 84 
 85         job.setMapperClass(JoinMapper.class);
 86         job.setMapOutputKeyClass(Text.class);
 87         job.setMapOutputValueClass(Text.class);
 88 
 89         job.setReducerClass(JoinReduce.class);
 90         job.setOutputKeyClass(Text.class);
 91         job.setOutputValueClass(NullWritable.class);
 92         //指定路径
 93         FileInputFormat.addInputPath(job,new Path("/word"));
 94         Path path = new Path("/output");
 95         FileSystem fs = FileSystem.get(new Configuration());
 96         if(fs.exists(path)){
 97             fs.delete(path);
 98         }
 99         FileOutputFormat.setOutputPath(job,new Path("/output"));
100         job.waitForCompletion(true);
101         System.out.println("join 正在执行");
102     }
103 }

 

 

 

上一篇:10.Mapreduce实例——MapReduce自定义输入格式小


下一篇:MySQL默认隔离级别的记录锁和间隙锁