Spark的shuffle流程

1、shuffle流程演变

  • Spark 0.8及以前 Hash Based Shuffle
  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出历史舞台

2、Hash Based Shuffle

  • 未引入Consolidation前
    • 从MapTask到ReduceTask,每个MapTask会产生和reduce任务数量相等的小文件,也就是小文件的数量等同于 m*r,这种方式会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量
  • 引入Consolidation后
    • 把同一个core中运行的Map任务生成的文件合并到一个文件中,生成小文件的数量变为 core*r
    • 同一个core中先后输出的文件,对应到同一个文件中不同的segment上,合并在一起称为FileSegment,形成一个ShuffleBlockFile

3、Sort Based Shuffle

  • map端的任务会根据分区id和key进行对数据进行排序,然后讲所有数据写入到一个文件中,并生成一个索引文件

4、Shuffle Writer的三种方式

  • Shuffle Writer有ByPassMergeSortShuffleWriter、UnSafeShuffleWriter、SortShuffleWriter
  • 三种Shuffle Writer的选择方式:
    • 1、首先判断map端是否开启mapSideCombiner,并且判断分区数量是否小于spark.shuffle.sort.byPassMerge.Threshold(默认为200),如果条件满足,使用ByPassMergeSortShuffleWriter
    • 2、如果不满足上述两个条件的任意一个,判断serializer是否支持relocation,并且判断是否十四用aggregator和分区数是否小于16777215,如果条件都满足,使用UnSafeShuffleWriter,否则使用SortShuffleWriter
上一篇:微信红包随机算法转载


下一篇:阿里云Spark Shuffle的优化