文章目录
分区
通过分区,将不同的数据交给不同的 Reduce,产生不同的结果.
MapTask
我们把写好的代码打包成Jar运行在集群上.那么,其中的map方法在每个节点都有一个.
这个方法这时可以看做是MapTask
Partitioner 分区类
而 getPartition 方法 返回的是 Reduce 的编号
第几个 Reduce
重写 getPartitioner 方法
整个步骤和上篇没有分区的文章差不多,就是在JobMain中进行分区类的设置. 以及设置 Reduce 的个数
/**
* PartitionerOwner
* @desc
* @author xxx
* @date 2021/9/4
*/
public class PartitionerOwner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
if (text.toString().length() >= 5) {
return 0;
}
return 1;
}
}
/**
* JobMain
* @desc MR分区
* @author xxx
* @date 2021/9/3
*/
public class JobShuffleMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// 使用 ToolRunner.run 运行下面的方法. 需要传递 configuration jobName args
Configuration configuration = new Configuration();
int status = ToolRunner.run(configuration, new JobMain(), args);
System.out.println("任务状态码 :: "+status);
System.exit(status);
}
@Override
public int run(String[] strings) throws Exception {
// 获得Job实例
Job job = Job.getInstance(super.getConf(), "mShuffler_word_count");
// 在集群上运行的话,设置主类
job.setJarByClass(JobShuffleMain.class);
// 设置从哪里读取数据
job.setInputFormatClass(TextInputFormat.class);
// 设置读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(Constant.URL + "//wordcount"));
// 设置 map
job.setMapperClass(WordCountShuffleMapper.class);
// 设置 map 输出的 key value 的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置 shuffle 分区
job.setPartitionerClass(PartitionerOwner.class);
// 设置 reduce
job.setReducerClass(WordCountShuffleReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置 reduce 的个数
job.setNumReduceTasks(2);
// 设置 Output
job.setOutputFormatClass(TextOutputFormat.class);
// 通过 TextOutputFormat 设置输出路径
TextOutputFormat.setOutputPath(job,new Path(Constant.URL + "/mrShuffle/"));
// 设置 状态吗
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
}