MR之排序

1、MR 中的排序

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑.上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

  • MapTask它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,溢写完毕后,它会对磁盘上所有文件进行归并排序。
  • ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排序。
  1. 部分排序.
    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
  2. 全排序
    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件完全丧失了MapReduce所提供的并行架构。
  3. 辅助排序: ( GroupingComparator分组)
    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
  4. 二次排序.
    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

2、WritableComparable

Bean对象如果作为Map输出的key时,需要实WritableComparable接口并重写compareTo方法指定排序规则

2.1 全排序

数据

01	    a00df6s	 kar	 120.196.100.99	    384	       33	                       200
日志id   设备id  厂商id    ip           自有内容时长(秒) 第三方内容时长(秒)     网络状态码

需求

每台设备按照总播放时长(*时长+第三方时长)  降序排序

bean

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SpeakCompareBean implements WritableComparable<SpeakCompareBean> {
    private  String deviceId;
    private long selfDuration;  //  自有播放时长
    private long thirdPartDuration;  //  第三方播放时长
    private long sumDuration;   //   总时长

    public SpeakCompareBean() {
    }

    public SpeakCompareBean(String deviceId, long selfDuration, long thirdPartDuration) {
        this.deviceId = deviceId;
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.sumDuration = selfDuration + thirdPartDuration;
    }

  
    @Override
    public int compareTo(SpeakCompareBean o) {
        long sumDuration =  o.sumDuration;
        int  result ;
        if ( this.sumDuration > sumDuration){
            result = 1;
        }  else if (this.sumDuration < sumDuration){
            result  = -1;
        } else {
            result = 0;
        }
        return  result;

    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(deviceId);
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeLong(sumDuration);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.deviceId= dataInput.readUTF();
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.sumDuration = dataInput.readLong();
    }

    @Override
    public String toString() {
        return   deviceId + '\t' +
                 selfDuration +  "\t" +
                 thirdPartDuration +  "\t" +
                 sumDuration ;
    }
}

map

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SpeakCompareBeanMapper extends Mapper<LongWritable, Text,SpeakCompareBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //super.map(key, value, context);
        String[] fields = value.toString().trim().split("\t");
        String deviceId = fields[1];
        long selfDuration  = Long.parseLong(fields[4]);
        long thirdPartDuration = Long.parseLong(fields[5]);
        SpeakCompareBean speakCompareBean =  new SpeakCompareBean(deviceId,selfDuration,thirdPartDuration);
        NullWritable  e = NullWritable.get();
        context.write(speakCompareBean, e);                                                                                
    }
}

reduce

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SpeakCompareBeanReducer extends Reducer<SpeakCompareBean, NullWritable, NullWritable,SpeakCompareBean> {
    @Override
    protected void reduce(SpeakCompareBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        for(NullWritable value : values){
            context.write(value,key);
        }
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class SpeakCompareBeanDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SpeakCompareBeanDriver.class);

        job.setMapperClass(SpeakCompareBeanMapper.class);
        job.setReducerClass(SpeakCompareBeanReducer.class);

        job.setMapOutputKeyClass(SpeakCompareBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(SpeakCompareBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean result =job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

运行结果
MR之排序

2.2 GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。

数据
MR之排序
需求

需要求出每一个订单中成交金额最大的一笔交易

实现思路

  • Mapper
    读取一行文本数据,切分出每个字段;订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排;map()方法输出kv;key–>bean对象 ,value–>NullWritable.get();

  • Shuffle
    指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)

  • 指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;

  • Reduce
    每个reduce()方法写出一组key的第一个

bean

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private  String  orderId;
    private Double price;
    public OrderBean(){}

    public OrderBean(String  orderId,Double price){
        this.orderId = orderId;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public int compareTo(OrderBean o) {
        int  i = orderId.compareTo(o.getOrderId());
        if (i == 0){
            return - price.compareTo(o.getPrice());
        }
        return - i;
    }

    @Override
    public String toString() {
        return
                "orderId='" + orderId + '\'' +
                ", price=" + price ;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeDouble(price);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }
}

partitioner

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        //自定义分区,将相同订单id的数据发送到同一个reduce里面去
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i;
    }
}

WritableComparator

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator  extends WritableComparator {
    //将我们自定义的OrderBean注册到我们自定义的CustomGroupIngCompactor当中来
    //表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组
    //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
    public OrderGroupingComparator() {
        super(OrderBean.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        final int i = first.getOrderId().compareTo(second.getOrderId());
        if (i == 0) {
            System.out.println(first.getOrderId() + "----" +
                    second.getOrderId());
        }
        return i;
    }
}

map

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //super.map(key, value, context);
        String[]  fields = value.toString().trim().split("\t");
        String orderId = fields[0];
        Double price = Double.parseDouble(fields[2]);
        context.write(new OrderBean(orderId,price),NullWritable.get());
    }
}

reduce

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer  extends Reducer<OrderBean, NullWritable,OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //每个reduce里面第一个就已经是金额最大的了;
        //
        context.write(key, NullWritable.get());
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
       // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(OrderDriver.class);
        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);
        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 6 指定job的输入原始文件所在目录

        FileInputFormat.setInputPaths(job, new Path("E:\\hdfs_test_dir\\groupingComparator.txt"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hdfs_test_dir\\order_output"));
        // 7 指定分区器,指定分组比较器,设置reducetask数量
        job.setPartitionerClass(OrderPartitioner.class);
        job.setGroupingComparatorClass(OrderGroupingComparator.class);
        job.setNumReduceTasks(2);
        // 8 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

运行结果
MR之排序
MR之排序

上一篇:1336:【例3-1】找树根和孩子


下一篇:mr_map