1.Mapreduce操作不需要reduce阶段
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 12 import java.io.IOException; 13 14 public class WordCount03 { 15 public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable>{ 16 @Override 17 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 18 String line = value.toString(); 19 String s = line.split(",")[3]; 20 if(s.equals("男")){ 21 context.write(new Text(s),NullWritable.get()); 22 } 23 } 24 } 25 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 26 Job job= Job.getInstance(); 27 job.setNumReduceTasks(0); 28 /** 29 * 有些情况下,不需要reduce(聚合程序), 30 * 在不需要聚合操作的时候,可以不需要reduce 31 * 而reduce默认为1,需要手动设置为0, 32 * 如果没有设置为0,会产生默认的reduce,只不过reduce不处理任何数据 33 */ 34 job.setJobName("mr03程序"); 35 job.setJarByClass(WordCount03.class); 36 job.setMapOutputKeyClass(Text.class); 37 job.setMapOutputValueClass(NullWritable.class); 38 Path in = new Path("/word"); 39 FileInputFormat.addInputPath(job,in); 40 Path out = new Path("/output"); 41 FileSystem fs = FileSystem.get(new Configuration()); 42 if(fs.exists(out)){ 43 fs.delete(out); 44 } 45 FileOutputFormat.setOutputPath(job,out); 46 job.waitForCompletion(true); 47 } 48 }
注意:
有些情况下,不需要reduce(聚合程序), 在不需要聚合操作的时候,可以不需要reduce 而reduce默认为1,需要手动设置为0,
如果没有设置为0,会产生默认的reduce,只不过reduce不处理任何数据
2.MapReduce中join操作(数据拼接)
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.InputSplit; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 import java.io.IOException; 16 import java.util.ArrayList; 17 18 public class WordCount04 { 19 public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ 20 @Override 21 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 22 //1.获取数据的路径 InputSplit 23 //context 上面是hdfs 下面如果有reduce就是reduce 没有就是hdfs 24 InputSplit inputSplit = context.getInputSplit(); 25 FileSplit fs=(FileSplit)inputSplit; 26 String url = fs.getPath().toString(); 27 //2.判断 28 if(url.contains("students")){//true当前数据为students.txt 29 String id = value.toString().split(",")[0]; 30 //为了方便reduce数据的操作 针对于不同的数据 打一个标签 31 String line = "*" + value.toString(); 32 context.write(new Text(id),new Text(line)); 33 }else {//false 当前数据为score.txt 34 //以学号作为k 也是两张数据的关联条件 35 String id = value.toString().split(",")[0]; 36 //为了方便reduce数据的操作 针对于不同的数据 打一个标签 37 String line = "#" + value.toString(); 38 context.write(new Text(id),new Text(line)); 39 } 40 } 41 } 42 public static class JoinReduce extends Reducer<Text,Text,Text,NullWritable>{ 43 @Override 44 protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 45 //数据在循环之外保存 46 String stuInfo=""; 47 ArrayList<String> scores = new ArrayList<String>(); 48 //提取数据 49 for (Text value : values) { 50 //获取一行一行的数据(所有数据包含students.txt和score.txt) 51 String line = value.toString(); 52 if(line.startsWith("*")){//true 为学生数据 53 stuInfo= line.substring(1); 54 }else {//false 为学生成绩数据 55 scores.add(line.substring(1)); 56 } 57 } 58 /** 59 * 求的是 两张表的拼接 60 */ 61 //数据拼接 62 for (String score : scores) { 63 String subject = score.split(",")[1]; 64 String s = score.split(",")[2]; 65 String end=stuInfo+","+subject+","+s; 66 context.write(new Text(end),NullWritable.get()); 67 } 68 /** 69 * 求的是 两张表的拼接 拼接过程中对成绩求和 70 */ 71 // long sum=0l; 72 // for (String s : scores) { 73 // Integer sc =Integer.valueOf( s.split(",")[2]); 74 // sum+=sc; 75 // } 76 // String end=stuInfo+","+sum; 77 // context.write(new Text(end),NullWritable.get()); 78 } 79 } 80 public static void main(String[] args) throws Exception { 81 Job job = Job.getInstance(); 82 job.setJobName("Join MapReduce"); 83 job.setJarByClass(WordCount04.class); 84 85 job.setMapperClass(JoinMapper.class); 86 job.setMapOutputKeyClass(Text.class); 87 job.setMapOutputValueClass(Text.class); 88 89 job.setReducerClass(JoinReduce.class); 90 job.setOutputKeyClass(Text.class); 91 job.setOutputValueClass(NullWritable.class); 92 //指定路径 93 FileInputFormat.addInputPath(job,new Path("/word")); 94 Path path = new Path("/output"); 95 FileSystem fs = FileSystem.get(new Configuration()); 96 if(fs.exists(path)){ 97 fs.delete(path); 98 } 99 FileOutputFormat.setOutputPath(job,new Path("/output")); 100 job.waitForCompletion(true); 101 System.out.println("join 正在执行"); 102 } 103 }