MapReduce流量统计

准备数据access.log

要用到的只有第二个手机号,倒数第三上行流量,倒数第二下行流量

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13726238888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157985066   13726238888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  10000   20000   200

自定义复杂数据类型

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定义复杂数据类型:
 *  (1)需要实现Writable接口
 *  (2)需要实现接口中的write和readFields方法
 *  (3)比较隐蔽的一点,需要定义默认的空构造方法
 * 属性必须覆盖getter/setter方法!
 */
public class Access implements Writable {
    private String phone; //手机号
    private long up; //上行流量
    private long down; //下行流量
    private long sum; //总流量(上行+下行)
    public Access(){}
    public Access(String phone,long up,long down){
        this.phone = phone;
        this.up = up;
        this.down = down;
        this.sum = up + down;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phone); //String
        out.writeLong(up);   //long
        out.writeLong(down);
        out.writeLong(sum);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        //按照write输入的顺序获取
        this.phone = in.readUTF();
        this.up = in.readLong();
        this.down = in.readLong();
        this.sum = in.readLong();
    }

    // 省略setter/getter...

    @Override
    public String toString() {
        return "Access{" +
                "phone='" + phone + '\'' +
                ", up=" + up +
                ", down=" + down +
                ", sum=" + sum +
                '}';
    }
}

自定义Mapper处理

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * 自定义Mapper处理类
 *      LongWritable:偏移量key
 *      Text:        一行数据value
 *      Text:        手机号作为key
 *      Access:      复杂类型对象作为value
 */
public class AccessMapper extends Mapper<LongWritable, Text,Text,Access> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split("\t");
        String phone = words[1]; //取出手机号
        long up = Long.parseLong(words[words.length-3]); //上行流量
        long down = Long.parseLong(words[words.length-2]); //下行流量
        long sum = up + down;   //总流量
        //写入缓存
        context.write(new Text(phone),new Access(phone,up,down));
    }
}

自定义Reducer处理

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * 自定义Reducer处理类
 */
public class AccessReducer extends Reducer<Text,Access,Text,Access> {
    @Override
    protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {
        //values里都是手机号相同的对象,即<Access,Access>
        long ups = 0;
        long downs = 0;
        for (Access access : values) {
               ups += access.getUp();
               downs += access.getDown();
        }
        // 写入context
        context.write(key,new Access(key.toString(),ups,downs));
    }
}

编写Driver类

public class AccessLocalApp {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        // 创建一个Job
        Job job = Job.getInstance(conf);
        job.setJarByClass(AccessLocalApp.class);
        // 设置Mapper和Reduer类
        job.setMapperClass(AccessMapper.class);
        job.setReducerClass(AccessReducer.class);
        // 设置Mapper和Reducer输出的key,value的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Access.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Access.class);
        // 输入,输出路径
        FileInputFormat.setInputPaths(job,new Path("input"));
        FileOutputFormat.setOutputPath(job,new Path("output"));
        // 提交作业Job
        job.waitForCompletion(true);
    }
}

输出结果part-r-00000

13480253104 Access{phone='13480253104', up=180, down=180, sum=360}
13502468823 Access{phone='13502468823', up=7335, down=110349, sum=117684}
13560436666 Access{phone='13560436666', up=1116, down=954, sum=2070}
13560439658 Access{phone='13560439658', up=2034, down=5892, sum=7926}
13602846565 Access{phone='13602846565', up=1938, down=2910, sum=4848}
13660577991 Access{phone='13660577991', up=6960, down=690, sum=7650}
13719199419 Access{phone='13719199419', up=240, down=0, sum=240}
13726230503 Access{phone='13726230503', up=2481, down=24681, sum=27162}
13726238888 Access{phone='13726238888', up=12481, down=44681, sum=57162}
13760778710 Access{phone='13760778710', up=120, down=120, sum=240}
13826544101 Access{phone='13826544101', up=264, down=0, sum=264}
13922314466 Access{phone='13922314466', up=3008, down=3720, sum=6728}
13925057413 Access{phone='13925057413', up=11058, down=48243, sum=59301}
13926251106 Access{phone='13926251106', up=240, down=0, sum=240}
13926435656 Access{phone='13926435656', up=132, down=1512, sum=1644}
15013685858 Access{phone='15013685858', up=3659, down=3538, sum=7197}
15920133257 Access{phone='15920133257', up=3156, down=2936, sum=6092}
15989002119 Access{phone='15989002119', up=1938, down=180, sum=2118}
18211575961 Access{phone='18211575961', up=1527, down=2106, sum=3633}
18320173382 Access{phone='18320173382', up=9531, down=2412, sum=11943}
84138413    Access{phone='84138413', up=4116, down=1432, sum=5548}

重构思路

  • 可以看到,输出结果是Access{...}类型,其实就是toString格式问题,可以修改如下:
public String toString() {
    return phone+","+up+","+down+","+sum;
}
  • 使用NullWritable
// Reducer类型
public class AccessReducer extends Reducer<Text,Access, NullWritable,Access> { ... }
// Reducer输出key
context.write(NullWritable.get(),new Access(key.toString(),ups,downs));
  • 升级Reducer的完整代码
public class AccessReducer extends Reducer<Text,Access, NullWritable,Access> {
    @Override
    protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {
        //values里都是手机号相同的对象,即<Access,Access>
        long ups = 0;
        long downs = 0;
        for (Access access : values) {
               ups += access.getUp();
               downs += access.getDown();
        }
        // 写入context
        context.write(NullWritable.get(),new Access(key.toString(),ups,downs));
    }
}

新的输出结果

13480253104,180,180,360
13502468823,7335,110349,117684
13560436666,1116,954,2070
13560439658,2034,5892,7926
13602846565,1938,2910,4848
13660577991,6960,690,7650
13719199419,240,0,240
13726230503,2481,24681,27162
13726238888,12481,44681,57162
13760778710,120,120,240
13826544101,264,0,264
13922314466,3008,3720,6728
13925057413,11058,48243,59301
13926251106,240,0,240
13926435656,132,1512,1644
15013685858,3659,3538,7197
15920133257,3156,2936,6092
15989002119,1938,180,2118
18211575961,1527,2106,3633
18320173382,9531,2412,11943
84138413,4116,1432,5548

这样就拿到我们想要的数据结果了!

自定义Partitioner

需求:将统计结果按照手机号的前缀进行区分,写到不同的文件中。

  • 自定义Partitioner实现
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * MapReduce自定义分区规则
 */
public class AccessPartitioner extends Partitioner<Text,Access> {
    /**
     * @param phone 手机号
     */
    @Override
    public int getPartition(Text phone, Access access, int numPartitions) {
        if (phone.toString().startsWith("13")){
            return 0;
        }else if(phone.toString().startsWith("15")){
            return 1;
        }else {
            return 2;
        }
    }
}
  • Driiver参数
// 设置自定义分区规则
job.setPartitionerClass(AccessPartitioner.class);
// 设置Reduce个数
job.setNumReduceTasks(3);
  • 再次运行,就得到了想要的统计结果part-r-00000part-r-00001part-r-00002
13480253104,180,180,360
13502468823,7335,110349,117684
13560436666,1116,954,2070
13560439658,2034,5892,7926
13602846565,1938,2910,4848
13660577991,6960,690,7650
13719199419,240,0,240
13726230503,2481,24681,27162
13726238888,12481,44681,57162
13760778710,120,120,240
13826544101,264,0,264
13922314466,3008,3720,6728
13925057413,11058,48243,59301
13926251106,240,0,240
13926435656,132,1512,1644
15013685858,3659,3538,7197
15920133257,3156,2936,6092
15989002119,1938,180,2118
18211575961,1527,2106,3633
18320173382,9531,2412,11943
84138413,4116,1432,5548
上一篇:c语言--文件随机读写


下一篇:spark中的pair rdd,看这一篇就够了