一、Combiner 概述
Combiner 合并是 Shuffle 阶段的一个可选操作
,旨在提前对数据进行一次合并(将 <a, 1>, <a, 1>, <a, 1> 合并为 <a, 3>),以减少 Reducer 的压力
。通常情况下我们会开启大量的 MapTask,而 ReduceTask 的个数很少,所以 Reducer 要拉取和处理数据量非常大,如果提前对数据进行合并,例如将1万个键相同的数据合并成一个,那么 Reducer 的工作就会大大减少。
Combiner 大概会发生两次
- 第一次发生在环形缓冲区分区排序后,溢写到硬盘前。
- 第二次是在多个溢写文件合并,然后归并排序后。
可以把 Combiner 理解为提前的 Reducer
,只不过这个 Reducer 不是针对一个分区上的数据,而是针对单个 MapTask 处理的数据。
二、使用 Combiner
新建 Java 类文件 WordCountCombiner
package com.pineapple.mapreduce.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Combiner 属于 Shuffle 阶段
* 前后两个泛型都是 Map 输出的 KV 的类型
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private final IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context
context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key, outV);
}
}
继承 Recuer 类并实现泛型,Combiner 的输入输出 都是在 Shuffle 阶段,所以输入输出泛型和 Mapper 的输出泛型 一致。
在 Driver 文件中指定 Combiner
// 设置Combiner
job.setCombinerClass(WordCountCombiner.class);
我们在输出日志中可以看到 Combiner 的相关信息
而如果不设置 Combiner ,默认是这样的
三、一些细节
如果我们指定 ReduceTask 个数为0,会发生什么?
// 设置Combiner
job.setCombinerClass(WordCountCombiner.class);
job.setNumReduceTasks(0);
运行是可以运行的,但是只有 Map 阶段,没有 Shuffle 阶段和 Reduce阶段
最终输出的文件名为 part-m-00000
,而正常应该是 part-r-00000
,所以这里是在 Map 阶段就直接输出了,数据也没有进行合并,值全是1
其实这个案例的 Combiner 中的代码 和 Reducer 是完全一样的,我们可以直接用 Reducer 作为 Combiner
// 设置Combiner
job.setCombinerClass(WordCountReducer.class);
四、总结
- Combiner 合并操作是 Shuffle 阶段中的可选操作
- 需要继承 Reducer 类,输入输出泛型和 Mapper 的输出泛型一致
- 如果没有 Reducer,那么 Shuffle 阶段也没有,直接 Map 后输出
- 很多情况下可以用 Reducer 作为 Combiner
Github 仓库地址:https://github.com/pineapple-cpp/MapReduceDemo
喜欢我的文章的话,欢迎关注