用户流量:
把对应的电话号码的上行下行流量进行统计,最后显示出手机号 上行流量 下行流量 总流量
Mapper类:
static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text>{ protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //读取一行数据 String line = value.toString(); //对数据进行有规则截取 String[] words = line.split("\t"); //获取有效数据 String phone = words[1]; String shang = words[2]; String xia = words[3]; //写入上下文 context.write(new Text(phone), new Text(shang +","+xia)); } }
Reducer类:
static class WordCountReducer extends Reducer<Text, Text, Text, Text>{ protected void reduce(Text phone, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //定义空变量 int sum = 0 ; String shang = null; String xia = null; for (Text value : values) { String [] words = value.toString().split(","); shang = words[0]; xia = words[1]; sum = Integer.parseInt(shang) + Integer.parseInt(xia); } context.write(phone, new Text(shang +" "+ xia+" "+ sum)); } }
提交主类:
public static void main(String[] args) throws Exception { //加载配置文件 Configuration config = new Configuration(); //创建Job对象 Job job = Job.getInstance(config); //设置提交主类 job.setJarByClass(Phone.class); //设置Mapper相关的参数 job.setMapperClass(PhoneMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reducer类相关的参数 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入路径 FileInputFormat.setInputPaths(job, new Path("/shuju1904/phone.txt")); //设置输出路径 FileOutputFormat.setOutputPath(job, new Path("/shuju1904/output888")); //提交 job.waitForCompletion(true); }