MapReduce之输出结果排序


 前面的案例中我们介绍了统计出每个用户的上行流量,下行流量及总流量,现在我们想要将输出的结果按照总流量倒序排序。

MapReduce之输出结果排序

实现思路

 MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前会排序),排序的依据是map输出的key。所以我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法来指定比较规则

实现步骤

1.自定义Bean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
 * 存储流量相关数据
 * @author 波波烤鸭
 *
 */
public class Flow implements WritableComparable<Flow> {
    // 上下流量
    private long upFlow;

    // 下行流量
    private long downFlow;
    // 总流量
    private long sumFlow;
    
    /**
     * 比较Flow对象的总流量
     */
    @Override
    public int compareTo(Flow o) {
        // TODO Auto-generated method stub
        return -(int)(this.sumFlow - o.getSumFlow());

    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public Flow(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    /**
     * 无参构造方法必须要有 反射的时候需要用到
     */
    public Flow() {
        super();
    }

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    /**
     * 反序列化 反序列化的顺序和序列化的顺序一致
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

2.Map阶段

public class FlowCountMap extends Mapper<LongWritable, Text, Flow, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 将一行数据转换为String
        String line = value.toString();
        // 切分字段
        String[] fields = line.split("\t");
        // 取出手机号
        String phoneNum = fields[0];
        // 取出上行流量下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        Flow flow = new Flow(upFlow, downFlow);
        context.write(flow, new Text(phoneNum));
    }
}

3.Reduce阶段

public class FlowCountReducer extends Reducer<Flow, Text, Text, Flow>{

    @Override
    protected void reduce(Flow flow, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        
        String phone = values.iterator().next().toString();
        // 输出结果
        context.write(new Text(phone), flow);
    }
}

4.启动类

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(true);
        conf.set("mapreduce.framework.name", "local");
        // 输出到HDFS文件系统中
        // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
        // 输出到本地文件系统
        conf.set("fs.defaultFS", "file:///");
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(FlowTest.class);
        
        // 指定本job要使用的map/reduce的工具类
        job.setMapperClass(FlowCountMap.class);
        job.setReducerClass(FlowCountReducer.class);
        
        // 指定mapper输出kv的类型
        job.setMapOutputKeyClass(Flow.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Flow.class);
        
        // 指定job的原始文件输入目录
        // 6.设置输出输出类
        FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/sort/input/"));
        FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/sort/output/"));
                
        //将job中配置的相关参数,以及job所用的jar包提交给yarn运行
        //job.submit();  waitForCompletion等待执行完成
        boolean flag = job.waitForCompletion(true);
        System.exit(flag?0:1);
    }
}

5.输出结果

MapReduce之输出结果排序

MapReduce之输出结果排序

MapReduce之输出结果排序

成功倒序输出

本案例的目的有两个:

   实现对输出结果排序我们可以在自定义对象的compareTo方法中指定

   如果一次MapReduce任务获取不到我们需要的结果我们可以对输出的结果做多次MapReduce任务。


上一篇:FAST DUAL执行计划?


下一篇:6、CISCO 2960交换机RSPAN配置