hadoop MapReduce辅助排序解析

1、数据样本,w1.csv到w5.csv,每个文件数据样本2000条,第一列是年份从1990到2000随机,第二例是数据从1-100随机,本例辅助排序目标是找出每年最大值,实际上结果每年最大就是100,但是这里通过mapreduce辅助排序方式来找。

1999,71
1994,57
1995,33
1993,44
1994,99
1994,83
1995,59
... ...

2、核心概念:

这里先预先假设有海量的数据,所以按照hash算法将所有数据分区后,但是确保同一年的数据进入到同一个分区,也就是同一个reduce里,但是因为采用组合键的key,并不能保证同一年的键都到一个组里。
因为还有可能在这个分区里还有其他年份的数据。所以在分区后,需要再进行分组,确保即使一个分区里有多个年份数据,但是每个年份数据都在自己的分组里运行。

分区的目的就是增加并行计算,把各年份的数据分到不同的reduce
因为资源的问题,分区不能无限,但是年份会有很多,所以总会有的分区会多出一年的数据。
这个时候就需要分区内再额外分组,确保符合键(或者自定义对象)可以合并到一个分组里。

3、IntPair,本例把整行数据解析后,年份和数据都放入key,需要自定义一个IntPair对象,实际生产环境中可根据需求自定义各种类.

public class IntPair implements WritableComparable<IntPair> {
   private int first;
    private int second;

    public IntPair() {
    }

    public IntPair(int first, int second) {
        this.first = first;
        this.second = second;
    }

    public int getFirst() {
        return first;
    }

    public void setFirst(int first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }


    @Override
    public int compareTo(IntPair o) {
        int result = Integer.valueOf(first).compareTo(o.getFirst());
        if(result==0){
            result = Integer.valueOf(second).compareTo(o.getSecond());
        }
        return result;
    }


    public static int compare(int first1,int first2){
        return Integer.valueOf(first1).compareTo(Integer.valueOf(first2));
    }



    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(first);
        out.writeInt(second);
    }


    @Override
    public void readFields(DataInput in) throws IOException {
        first = in.readInt();
        second = in.readInt();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        IntPair intPair = (IntPair) o;

        if (first != intPair.first) return false;
        return second == intPair.second;
    }

    @Override
    public int hashCode() {
        int result = first;
        result = 31 * result + second;
        return result;
    }

    @Override
    public String toString() {
        return first+"\t"+second;
    }
}

 

4、RecordParser,记录解析器,用于解析数据,规避错误数据

public class RecordParser {
    private int year;
    private int data;
    private boolean valid;

    public int getYear() {
        return year;
    }

    public int getData() {
        return data;
    }

    public boolean isValid() {
        return valid;
    }

    public void parse(String value){
        String[] sValue = value.split(",");
        try {
            year = Integer.parseInt(sValue[0]);
            data = Integer.parseInt(sValue[1]);
            valid = true;
        }catch (Exception e){
            valid = false;
        }
    }
}

 

5、分区器

/**
 * @Author: xu.dm
 * @Date: 2019/2/21 11:56
 * @Description:根据key进行分区,确保同一个key.first进入相同的分区,泛型类型和mapper输出一致
 */
public class FirstPartitioner extends Partitioner<IntPair,IntWritable> {
    /**
     * Get the partition number for a given key (hence record) given the total
     * number of partitions i.e. number of reduce-tasks for the job.
     * <p>
     * <p>Typically a hash function on a all or a subset of the key.</p>
     *
     * @param key       the key to be partioned.
     * @param value  the entry value.
     * @param numPartitions the total number of partitions.
     * @return the partition number for the <code>key</code>.
     */
    @Override
    public int getPartition(IntPair key, IntWritable value, int numPartitions) {
        return Math.abs(key.getFirst() * 127) % numPartitions;
    }
}

 

 

6、key比较器,map阶段的key排序使用,如果没有分组比较器,则key比较器也会应用在混洗和reduce阶段。

/**
 * @Author: xu.dm
 * @Date: 2019/2/21 11:59
 * @Description: key比较器
 * 对IntPair的first升序,second降序,在mapper排序的时候被应用
 * 最终同样年份的数据第一条是最大的。
 */
public class KeyComparator extends WritableComparator {
    protected KeyComparator() {
        super(IntPair.class,true);//需要实例化
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        IntPair p1=(IntPair)a;
        IntPair p2=(IntPair)b;
        int result = IntPair.compare(p1.getFirst(),p2.getFirst());
        if(result==0){
            result = -IntPair.compare(p1.getSecond(),p2.getSecond()); //前面加一个减号求反
        }
        return result;
    }
}

 

7、分组比较器,这里最关键,看注释。

/**
 * @Author: xu.dm
 * @Date: 2019/2/21 12:16
 * @Description: 分组比较器,确保同一个年份的数据在同一个组里
 * 这个是本例最特殊的地方,之前key比较器使得key值中的年份升序,数据降序排列。
 * 那么这个分组比较器只按年进行比较,意味着,[1990,100]和[1990,00]会被认为是相同的分组,
 * 而,reduce阶段,相同的KEY只取第一个,哦也,这个时候,reduce阶段后,年份中最大的数据就被保存下来,其他数据都被kickout
 * 所以,用这种方式变相的达到取最大值得效果。
 */
public class GroupComparator extends WritableComparator {
    public GroupComparator() {
        super(IntPair.class,true);
    }


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        IntPair p1=(IntPair)a;
        IntPair p2=(IntPair)b;
        return IntPair.compare(p1.getFirst(),p2.getFirst());
    }
}

 

8、mapper,如果只取年份里的最大数据,Mapper<LongWritable,Text,IntPair,IntWritable> 的IntWritable可以用NullWritable,这里保留IntWritable是因为,程序稍加改动就可以输出所有年份数据的计数

public class DataMapper extends Mapper<LongWritable,Text,IntPair,IntWritable> {
    private RecordParser parser = new RecordParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        parser.parse(value.toString());
        if(parser.isValid()){
            context.write(new IntPair(parser.getYear(),parser.getData()),new IntWritable(1));
            context.getCounter("MapValidData","dataCounter").increment(1); //做一个计数,总的数据应该是10000条。
        }
    }
}

 

9、reducer

public class DataReducer extends Reducer<IntPair,IntWritable,IntPair,IntWritable> {

    @Override
    protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        //因为分组器,[1990,100]和[1990,00]会被认为是相同的分组
        //这里的计数就会混淆。如果需要年份下各数据的正确的计数结果,则需要注销分组器
//        for(IntWritable val:values){
//            sum+=val.get();
//        }

        context.write(key,new IntWritable(sum));
    }
}

10、job

public class DataSecondarySort extends Configured implements Tool {
    /**
     * Execute the command with the given arguments.
     *
     * @param args command specific arguments.
     * @return exit code.
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = Job.getInstance(conf,"Secondary Sort");
//        conf.set("mapreduce.job.ubertask.enable","true");

        if(conf==null){
            return -1;
        }

        job.setJarByClass(DataSecondarySort.class);
        job.setMapperClass(DataMapper.class);
        job.setPartitionerClass(FirstPartitioner.class);
        job.setSortComparatorClass(KeyComparator.class);
// 决定如何分组 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(DataReducer.class); // job.setNumReduceTasks(2);//如果数据海量,则可以根据情况设置reduce的数目,也是分区的数量,通过Tool类,也可以在命令行进行设置 job.setOutputKeyClass(IntPair.class); //如果只求最大数,前面的mapper,reducer和这里的输出都可以设置成NullWritable job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); Path outPath = new Path(args[1]); FileSystem fileSystem = outPath.getFileSystem(conf); //删除输出路径 if(fileSystem.exists(outPath)) { fileSystem.delete(outPath,true); } return job.waitForCompletion(true) ? 0:1; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new DataSecondarySort(),args); System.exit(exitCode); } }

 

11、如果只求最大值,结果会是这样:

[hadoop@bigdata-senior01 ~]$ hadoop fs -cat /output3/part-r-00000 | more
1990    100    0
1991    100    0
1992    100    0
1993    100    0
1994    100    0
1995    100    0
1996    100    0
1997    100    0
1998    100    0
1999    100    0
2000    100    0

如果要求最大值和计数则会列出所有数据,当然需要注销分组器的set代码,并打开reducer的sum

[hadoop@bigdata-senior01 ~]$ hadoop fs -cat /output/part-r-00000 | more
1990    100    10
1990    99    15
1990    98    10
1990    97    9
1990    96    6
1990    95    4
1990    94    12
1990    93    9
1990    92    12
1990    91    13
1990    90    8
1990    89    9
... ...

多个分区可以使用job.setNumReduceTasks(n),或者在命令行上指定

[hadoop@bigdata-senior01 ~]$ hadoop jar DataSecondarySort.jar -D mapreduce.job.reduces=3 /sampler /output3

[hadoop@bigdata-senior01 ~]$ hadoop fs -ls /output3
Found 4 items
-rw-r--r-- 1 hadoop supergroup 0 2019-02-21 15:29 /output3/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 2868 2019-02-21 15:29 /output3/part-r-00000
-rw-r--r-- 1 hadoop supergroup 3860 2019-02-21 15:29 /output3/part-r-00001
-rw-r--r-- 1 hadoop supergroup 3850 2019-02-21 15:29 /output3/part-r-00002

 

总结一下:最容易混淆的概念就是分组,而排序和分组实际上就是MapReduce最核心的地方。

 

上一篇:OpenFeign服务接口调用


下一篇:TYVJ P1026 犁田机器人 Label:水