针对微信的一篇推送附有的数据链接进行MapReduce统计

原推送引用:https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg

版权归原作者所有,如有侵权请及时联系本人,见谅!

 

原文采用Excel进行统计数据,这里采用刚学习的工具进行练习。

 

  1 import java.io.IOException;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.conf.Configured;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.IntWritable;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 import org.apache.hadoop.util.Tool;
 15 import org.apache.hadoop.util.ToolRunner;
 16 
 17 /**
 18  * https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg
 19  * 针对[新兴生态系统:Python和R语言,谁更适用于大数据Spark/Hadoop和深度学习?]
 20  * 的全球数据进行一系列统计
 21  */
 22 public class wechat  extends Configured implements Tool {
 23 
 24     /**
 25      * Map方法
 26      */
 27     private static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 28         private static final IntWritable mapOutputValue = new IntWritable(1) ; 
 29         private Text mapOutputKey = new Text() ;
 30         @Override
 31         public void map(LongWritable key, Text value, Context context)
 32             throws IOException, InterruptedException {
 33                 
 34             String input = value.toString();
 35             if(input.split(",").length<16) {
 36                 return;
 37             }
 38             String[] arrStr = input.split(",");           
 39             //Python-大数据计数器输出
 40             if("1".equals(arrStr[2])&&"1".equals(arrStr[14])) {
 41                 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_BigData").increment(1L);
 42             }
 43             //Python-Deep计数器输出
 44             if("1".equals(arrStr[2])&&"1".equals(arrStr[13])) {
 45                 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_Deep-Learning").increment(1L);
 46             }
 47             //R-大数据计数器输出
 48             if("1".equals(arrStr[3])&&"1".equals(arrStr[14])) {
 49                 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_BigData").increment(1L);
 50             }
 51             //R-深度计数器输出
 52             if("1".equals(arrStr[3])&&"1".equals(arrStr[13])) {
 53                 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_Deep-Learning").increment(1L);
 54             }
 55             
 56             arrStr = input.split(",")[16].split(";");
 57             //遍历
 58             for(String tool: arrStr){
 59                 // 设置key
 60                 mapOutputKey.set(tool);
 61                 // 输出
 62                 context.write(mapOutputKey, mapOutputValue) ;
 63             }
 64         }
 65     }
 66     
 67     /**
 68      * Reduce聚合结果
 69      */
 70     private static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
 71         private IntWritable outputValue = new IntWritable() ; 
 72         @Override
 73         protected void reduce(Text key, Iterable<IntWritable> values, Context context)
 74                 throws IOException, InterruptedException {
 75             
 76             // 定义临时变量,用于累加
 77             int sum = 0 ; 
 78 
 79             // 遍历
 80             for(IntWritable value: values){
 81                 sum += value.get() ;
 82             }
 83 
 84             if(sum < 500){
 85                 // 定义500以上的筛选
 86                 return ;
 87             }
 88             // 设置
 89             outputValue.set(sum) ;
 90             // 输出
 91             context.write(key, outputValue) ;
 92 
 93         }
 94     }
 95     
 96     /**
 97      * 驱动创建Job并提交运行  返回状态码
 98      */
 99     
100     public int run(String[] args) throws Exception {
101         // 创建一个Job
102         Job job = Job.getInstance(
103             this.getConf() , wechat.class.getSimpleName()
104         ) ;
105         // 设置job运行的class
106         job.setJarByClass(wechat.class);
107 
108         // 设置Job
109         // 1. 设置 input,从哪里读取数据
110         Path inputPath = new Path(args[0]) ;
111         FileInputFormat.addInputPath(job, inputPath);
112 
113         // 2. 设置 mapper类
114         job.setMapperClass(ModuleMapper.class);
115         // 设置map 输出的key和value的数据类型
116         job.setMapOutputKeyClass(Text.class);
117         job.setMapOutputValueClass(IntWritable.class);
118 
119         // 3. 设置 reducer 类
120         job.setReducerClass(ModuleReducer.class);
121         // 设置 reducer 输出的key和value的数据类型
122         job.setOutputKeyClass(Text.class);
123         job.setOutputValueClass(IntWritable.class);
124         // 设置ReduceTask个数
125        // job.setNumReduceTasks(2);
126 
127         // 4. 设置 处理结果保存的路径
128         Path outputPath = new Path(args[1]) ;
129         FileOutputFormat.setOutputPath(job, outputPath);
130 
131         // 提交job运行
132         boolean isSuccess = job.waitForCompletion(true) ;
133         
134         // 返回状态
135         return isSuccess ? 0 : 1;
136     }
137 
138     /**
139      * 
140      * @param args
141      * @throws Exception
142      */
143     public static void main(String[] args) throws Exception {
144         if(2 > args.length){
145             System.out.println("Usage: " + wechat.class.getSimpleName() +" <in> <out>");
146             return ;
147         }
148         
149         // 读取HADOOP中配置文件, core-*.xml hdfs-*.xml yarn-*.xml mapred-*.xml
150         Configuration conf = new Configuration() ;
151 
152         // 运行Job
153         int status = ToolRunner.run(conf, new wechat(), args) ;
154         
155         // exit program
156         System.exit(status);
157     }
158 
159 }

 

针对微信的一篇推送附有的数据链接进行MapReduce统计

上一篇:C# XSD校验XML文件的代码


下一篇:检测微信返回按钮事件