原推送引用:https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg
版权归原作者所有,如有侵权请及时联系本人,见谅!
原文采用Excel进行统计数据,这里采用刚学习的工具进行练习。
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.Tool; 15 import org.apache.hadoop.util.ToolRunner; 16 17 /** 18 * https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg 19 * 针对[新兴生态系统:Python和R语言,谁更适用于大数据Spark/Hadoop和深度学习?] 20 * 的全球数据进行一系列统计 21 */ 22 public class wechat extends Configured implements Tool { 23 24 /** 25 * Map方法 26 */ 27 private static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 28 private static final IntWritable mapOutputValue = new IntWritable(1) ; 29 private Text mapOutputKey = new Text() ; 30 @Override 31 public void map(LongWritable key, Text value, Context context) 32 throws IOException, InterruptedException { 33 34 String input = value.toString(); 35 if(input.split(",").length<16) { 36 return; 37 } 38 String[] arrStr = input.split(","); 39 //Python-大数据计数器输出 40 if("1".equals(arrStr[2])&&"1".equals(arrStr[14])) { 41 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_BigData").increment(1L); 42 } 43 //Python-Deep计数器输出 44 if("1".equals(arrStr[2])&&"1".equals(arrStr[13])) { 45 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_Deep-Learning").increment(1L); 46 } 47 //R-大数据计数器输出 48 if("1".equals(arrStr[3])&&"1".equals(arrStr[14])) { 49 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_BigData").increment(1L); 50 } 51 //R-深度计数器输出 52 if("1".equals(arrStr[3])&&"1".equals(arrStr[13])) { 53 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_Deep-Learning").increment(1L); 54 } 55 56 arrStr = input.split(",")[16].split(";"); 57 //遍历 58 for(String tool: arrStr){ 59 // 设置key 60 mapOutputKey.set(tool); 61 // 输出 62 context.write(mapOutputKey, mapOutputValue) ; 63 } 64 } 65 } 66 67 /** 68 * Reduce聚合结果 69 */ 70 private static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 71 private IntWritable outputValue = new IntWritable() ; 72 @Override 73 protected void reduce(Text key, Iterable<IntWritable> values, Context context) 74 throws IOException, InterruptedException { 75 76 // 定义临时变量,用于累加 77 int sum = 0 ; 78 79 // 遍历 80 for(IntWritable value: values){ 81 sum += value.get() ; 82 } 83 84 if(sum < 500){ 85 // 定义500以上的筛选 86 return ; 87 } 88 // 设置 89 outputValue.set(sum) ; 90 // 输出 91 context.write(key, outputValue) ; 92 93 } 94 } 95 96 /** 97 * 驱动创建Job并提交运行 返回状态码 98 */ 99 100 public int run(String[] args) throws Exception { 101 // 创建一个Job 102 Job job = Job.getInstance( 103 this.getConf() , wechat.class.getSimpleName() 104 ) ; 105 // 设置job运行的class 106 job.setJarByClass(wechat.class); 107 108 // 设置Job 109 // 1. 设置 input,从哪里读取数据 110 Path inputPath = new Path(args[0]) ; 111 FileInputFormat.addInputPath(job, inputPath); 112 113 // 2. 设置 mapper类 114 job.setMapperClass(ModuleMapper.class); 115 // 设置map 输出的key和value的数据类型 116 job.setMapOutputKeyClass(Text.class); 117 job.setMapOutputValueClass(IntWritable.class); 118 119 // 3. 设置 reducer 类 120 job.setReducerClass(ModuleReducer.class); 121 // 设置 reducer 输出的key和value的数据类型 122 job.setOutputKeyClass(Text.class); 123 job.setOutputValueClass(IntWritable.class); 124 // 设置ReduceTask个数 125 // job.setNumReduceTasks(2); 126 127 // 4. 设置 处理结果保存的路径 128 Path outputPath = new Path(args[1]) ; 129 FileOutputFormat.setOutputPath(job, outputPath); 130 131 // 提交job运行 132 boolean isSuccess = job.waitForCompletion(true) ; 133 134 // 返回状态 135 return isSuccess ? 0 : 1; 136 } 137 138 /** 139 * 140 * @param args 141 * @throws Exception 142 */ 143 public static void main(String[] args) throws Exception { 144 if(2 > args.length){ 145 System.out.println("Usage: " + wechat.class.getSimpleName() +" <in> <out>"); 146 return ; 147 } 148 149 // 读取HADOOP中配置文件, core-*.xml hdfs-*.xml yarn-*.xml mapred-*.xml 150 Configuration conf = new Configuration() ; 151 152 // 运行Job 153 int status = ToolRunner.run(conf, new wechat(), args) ; 154 155 // exit program 156 System.exit(status); 157 } 158 159 }