MaxCompute MapReduce的7个性能优化策略

1. 输入表的列裁剪

对于列数特别多的输入表,Map阶段处理只需要其中的某几列,可以通过在添加输入表时明确指定输入的列,减少输入量;
例如只需要c1,c2俩列,可以这样设置:

InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);

设置之后,你在map里的读取到的Record也就只有c1,c2俩列,如果之前是使用列名获取Record数据的,不会有影响,而用下标获取的需要注意这个变化。

2. 减少中间环节

如果有多个MR作业,之间有关联关系,前一个作业的输出是后一个作业的输入,可以考虑采用Pipeline的模式,将多个串行的MR作业合并为一个,这样可以用更少的作业数量完成同样的任务,一方面减少中间落表造成的的多余磁盘IO,提升性能;另一方面减少作业数量使调度更加简单,增强流程的可维护性。具体使用方法参见Pipeline示例

3. 避免资源重复读取

资源的读取尽量放置到setup阶段读取,避免资源的多次读取的性能损失,另外系统也有64次读取的限制,资源的读取参见使用资源示例

4. 减少对象构造开销

对于Map/Reduce阶段每次都会用到的一些java对象,避免在map/reduce函数里构造,可以放到setup阶段,避免多次构造产生的开销;

{
    ...
    Record word;
    Record one;

    public void setup(TaskContext context) throws IOException {


      // 创建一次就可以,避免在map中每次重复创建
      word = context.createMapOutputKeyRecord();

      one = context.createMapOutputValueRecord();

      one.set(new Object[]{1L});

    }
    ...
}

5. 合理选择partition column或自定义partitioner

合理选择partition columns,可以使用JobConf#setPartitionColumns这个方法进行设置(默认是key schema定义的column),设置后数据将按照指定的列计算hash值分发到reduce中去, 避免数据倾斜导致作业长尾现象,如有必要也可以选择自定义partitioner,自定义partitioner的使用方法如下:

import com.aliyun.odps.mapred.Partitioner;

public static class MyPartitioner extends Partitioner {

@Override
public int getPartition(Record key, Record value, int numPartitions) {
  // numPartitions即对应reducer的个数
  // 通过该函数决定map输出的key value去往哪个reducer
  String k = key.get(0).toString();
  return k.length() % numPartitions;
}
}

在jobconf里进行设置:

jobconf.setPartitionerClass(MyPartitioner.class)

另外需要在jobconf里明确指定reducer的个数:

jobconf.setNumReduceTasks(num)

6. 合理使用combiner

如果map的输出结果中有很多重复的key,可以合并后输出,combine后可以减少网络带宽传输和一定shuffle的开销,如果map输出本来就没有多少重复的,就不要用combiner,用了反而可能会有一些额外的开销。combiner实现的是和reducer相同的接口,例如一个WordCount程序的combiner可以定义如下:

/**
   * A combiner class that combines map output by sum them.
   */
  public static class SumCombiner extends ReducerBase {

    private Record count;

    @Override
    public void setup(TaskContext context) throws IOException {
      count = context.createMapOutputValueRecord();
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context)
        throws IOException {
      long c = 0;
      while (values.hasNext()) {
        Record val = values.next();
        c += (Long) val.get(0);
      }
      count.set(0, c);
      context.write(key, count);
    }
  }

7. 设置合理的split size

map默认的split size是256MB,split size的大小决定了map的个数多少,如果用户的代码逻辑比较耗时,map需要较长时间结束,可以通过JobConf#setSplitSize方法适当调小split size的大小。然而split size也不宜设置太小,否则会占用过多的计算资源。


欢迎加入“数加·MaxCompute购买咨询”钉钉群(群号: 11782920)进行咨询,群二维码如下:

MaxCompute MapReduce的7个性能优化策略 
上一篇:nginx+fastcgi安装配置


下一篇:歪理邪说解析架构设计师上午考试试题之五(分析2010下半年系统架构设计师上午试题21-25题)