《Hadoop实战》之Combiner

目录

为何使用combiner

  • 减少洗牌的键值对数量
  • 缓解数据倾斜问题

combiner的设计

combiner在数据转换上必须与reducer等价

  • 若Reducer仅处理分配型函数(最大值/最小值/求和/计数),可以使用reducer为combiner
  • 其他:自己设计combiner和reducer

求均值Combiner的例子

在输出中增加了一列count,将求均值任务转换为value和count的求和任务,使得reducer具有分配特性,因而可直接用于combiner(输出略微调整)。

  • Mapper输出:(key:【value count】)
  • Combiner输出:(key:【value count】)
  • Reducer输出:(key:【sum(value) / sum(count)】)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

// 求均值Combiner的例子
public class AverageByAttributeWithCombiner extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(",", -20);
            String country = fields[4];
            String numClaims = fields[8];

            if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
                context.write(new Text(country), new Text(numClaims + ",1"));
            }
        }
    }

    public static class ReduceClass extends Reducer<Text, Text, Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            double sum = 0;
            int count = 0;
            for (Text val: values) {
                String fields[] = val.toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
            }
            context.write(key, new DoubleWritable(sum / count));
        }
    }

    public static class Combiner extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            double sum = 0;
            int count = 0;
            for (Text val: values) {
                String fields[] = val.toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
            }
            context.write(key, new Text(sum + "," + count));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "AverageByAttributeWithCombiner");
        job.setJarByClass(AverageByAttributeWithCombiner.class);

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(MapClass.class);
        job.setCombinerClass(Combiner.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        System.exit(job.waitForCompletion(true)?0:1);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), new AverageByAttributeWithCombiner(), args);

        System.exit(exitCode);
    }
}

查看combine的效果

  • Map output records:Map输出的记录数量
  • Reduce input Records:Reduce输入记录的数量
上一篇:【大数据Hadoop系列】分布式计算框架——MapReduce


下一篇:pyspark-combineByKey详解