统计手机流量 - 使用bean传输

统计手机流量 - 使用bean传输

上一篇博客中,使用的是键值对传输,但是,如果要字段很多怎么办呢?比如,需要上行流量、下行流量、总流量,这个时候就不能使用键值对的方式传输了,而是要将这些字段封装为一个JavaBean。

JavaBean

public class Traffic implements Writable{
        private String phone;
        private int upStream;
        private int downStream;
        private int total;
        public Traffic() {
        }
        public void setTraffic(String phone, int upStream, int downStream) {
                this.phone = phone;
                this.upStream = upStream;
                this.downStream = downStream;
                this.total = upStream + downStream;
        }
        public String getPhone() {
                return phone;
        }
        public void setPhone(String phone) {
                this.phone = phone;
        }
        public int getUpStream() {
                return upStream;
        }
        public void setUpStream(int upStream) {
                this.upStream = upStream;
        }
        public int getDownStream() {
                return downStream;
        }
        public void setDownStream(int downStream) {
                this.downStream = downStream;
        }
        public int getTotal() {
                return total;
        }
        public void setTotal(int total) {
                this.total = total;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
                upStream = in.readInt();
                downStream = in.readInt();
                total = in.readInt();
        }

        @Override
        public void write(DataOutput out) throws IOException {
                out.writeInt(upStream);
                out.writeInt(downStream);
                out.writeInt(total);
        }
        @Override
        public String toString() {
                return "[phone=" + phone + ", upStream=" + upStream + ", downStream=" + downStream + ", total=" + total + "]";
        }
}

java的序列化需要实现Serializable接口,但是hadoop提供了更加便于网路传输的Writable接口,实现这个接口要实现两个方法,一个是序列化,另一个是反序列化,写出和读入的字段顺序要完全一致。

Mapper

public class TrafficSumMapper extends Mapper<LongWritable, Text, Text, Traffic>{

        Traffic traffic = new Traffic();
        Text k = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] fileds = line.split("\t");
                if(fileds.length!=11) return;

                int upStream = Integer.parseInt(fileds[8]);
                int downStream = Integer.parseInt(fileds[9]);

                traffic.setTraffic(fileds[0], upStream, downStream);
                k.set(fileds[0]);
                context.write(k, traffic);
        }

}

reduce


public class TrafficSumReduce extends Reducer<Text, Traffic, Text, Traffic>{
        @Override
        protected void reduce(Text key, Iterable<Traffic> values, Context context) throws IOException, InterruptedException {
                Traffic t = new Traffic();
                int totalUpStream = 0;
                int totalDownStream = 0;
                int total = 0;
                for (Traffic traffic : values) {
                        totalUpStream += traffic.getUpStream();
                        totalDownStream += traffic.getDownStream();
                        total += traffic.getTotal();
                }
                t.setTraffic(key.toString(), totalUpStream, totalDownStream);
                context.write(key, t);
        }
}

Main

public class Main {
        public static void main(String[] args) throws Exception {
                Configuration config = new Configuration();

                Job job = Job.getInstance(config);

                job.setJarByClass(TrafficCount.class);

                job.setMapperClass(TrafficSumMapper.class);
                job.setReducerClass(TrafficSumReduce.class);

                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Traffic.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Traffic.class);

                FileInputFormat.setInputPaths(job, new Path("/home/hasee/bigdata/traffic/input/traffic.dat"));
                FileOutputFormat.setOutputPath(job, new Path("/home/hasee/bigdata/traffic/output1"));
                boolean res = job.waitForCompletion(true);
                System.exit(res ? 0 : 1);
        }
}
上一篇:nginx.conf


下一篇:Nginx反向代理400错误