统计手机流量 - 使用bean传输
上一篇博客中,使用的是键值对传输,但是,如果要字段很多怎么办呢?比如,需要上行流量、下行流量、总流量,这个时候就不能使用键值对的方式传输了,而是要将这些字段封装为一个JavaBean。
JavaBean
public class Traffic implements Writable{
private String phone;
private int upStream;
private int downStream;
private int total;
public Traffic() {
}
public void setTraffic(String phone, int upStream, int downStream) {
this.phone = phone;
this.upStream = upStream;
this.downStream = downStream;
this.total = upStream + downStream;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public int getUpStream() {
return upStream;
}
public void setUpStream(int upStream) {
this.upStream = upStream;
}
public int getDownStream() {
return downStream;
}
public void setDownStream(int downStream) {
this.downStream = downStream;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
@Override
public void readFields(DataInput in) throws IOException {
upStream = in.readInt();
downStream = in.readInt();
total = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upStream);
out.writeInt(downStream);
out.writeInt(total);
}
@Override
public String toString() {
return "[phone=" + phone + ", upStream=" + upStream + ", downStream=" + downStream + ", total=" + total + "]";
}
}
java的序列化需要实现Serializable接口,但是hadoop提供了更加便于网路传输的Writable接口,实现这个接口要实现两个方法,一个是序列化,另一个是反序列化,写出和读入的字段顺序要完全一致。
Mapper
public class TrafficSumMapper extends Mapper<LongWritable, Text, Text, Traffic>{
Traffic traffic = new Traffic();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split("\t");
if(fileds.length!=11) return;
int upStream = Integer.parseInt(fileds[8]);
int downStream = Integer.parseInt(fileds[9]);
traffic.setTraffic(fileds[0], upStream, downStream);
k.set(fileds[0]);
context.write(k, traffic);
}
}
reduce
public class TrafficSumReduce extends Reducer<Text, Traffic, Text, Traffic>{
@Override
protected void reduce(Text key, Iterable<Traffic> values, Context context) throws IOException, InterruptedException {
Traffic t = new Traffic();
int totalUpStream = 0;
int totalDownStream = 0;
int total = 0;
for (Traffic traffic : values) {
totalUpStream += traffic.getUpStream();
totalDownStream += traffic.getDownStream();
total += traffic.getTotal();
}
t.setTraffic(key.toString(), totalUpStream, totalDownStream);
context.write(key, t);
}
}
Main
public class Main {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setJarByClass(TrafficCount.class);
job.setMapperClass(TrafficSumMapper.class);
job.setReducerClass(TrafficSumReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Traffic.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Traffic.class);
FileInputFormat.setInputPaths(job, new Path("/home/hasee/bigdata/traffic/input/traffic.dat"));
FileOutputFormat.setOutputPath(job, new Path("/home/hasee/bigdata/traffic/output1"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}