MapReduce-Shuffle003

文章目录

分区

通过分区,将不同的数据交给不同的 Reduce,产生不同的结果.

MapTask

我们把写好的代码打包成Jar运行在集群上.那么,其中的map方法在每个节点都有一个.
这个方法这时可以看做是MapTask
MapReduce-Shuffle003

Partitioner 分区类

而 getPartition 方法 返回的是 Reduce 的编号
第几个 Reduce
MapReduce-Shuffle003

重写 getPartitioner 方法

整个步骤和上篇没有分区的文章差不多,就是在JobMain中进行分区类的设置. 以及设置 Reduce 的个数

/**
 * PartitionerOwner
 * @desc
 * @author xxx
 * @date 2021/9/4
 */
public class PartitionerOwner extends Partitioner<Text, LongWritable> {

    @Override
    public int getPartition(Text text, LongWritable longWritable, int i) {
        if (text.toString().length() >= 5) {
            return 0;
        }
        return 1;
    }

}
/**
 * JobMain
 * @desc MR分区
 * @author xxx
 * @date 2021/9/3
 */
public class JobShuffleMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 使用 ToolRunner.run 运行下面的方法. 需要传递 configuration jobName args
        Configuration configuration = new Configuration();
        int status = ToolRunner.run(configuration, new JobMain(), args);
        System.out.println("任务状态码 :: "+status);
        System.exit(status);
    }

    @Override
    public int run(String[] strings) throws Exception {
        // 获得Job实例
        Job job = Job.getInstance(super.getConf(), "mShuffler_word_count");

        // 在集群上运行的话,设置主类
        job.setJarByClass(JobShuffleMain.class);

        // 设置从哪里读取数据
        job.setInputFormatClass(TextInputFormat.class);

        // 设置读取路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path(Constant.URL + "//wordcount"));

        // 设置 map
        job.setMapperClass(WordCountShuffleMapper.class);
        // 设置 map 输出的 key value 的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置 shuffle 分区
        job.setPartitionerClass(PartitionerOwner.class);


        // 设置 reduce
        job.setReducerClass(WordCountShuffleReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置 reduce 的个数
        job.setNumReduceTasks(2);

        // 设置 Output
        job.setOutputFormatClass(TextOutputFormat.class);
        // 通过 TextOutputFormat 设置输出路径
        TextOutputFormat.setOutputPath(job,new Path(Constant.URL + "/mrShuffle/"));

        // 设置 状态吗
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

}
上一篇:MapReduce执行流程


下一篇:MapReduce编程:单词计数--《大数据基础教程》