Spark核心思想

一、Spark的两种核心Shuffle

  Shuffle涉及磁盘的读写和网络的IO,因此shuffle性能的高低直接影响整个程序的性能。Spark也有map阶段和reduce阶段,因此也有Shuffle。

  1)基于hash的shuffle

    在每个Map阶段的task会为每个reduce阶段的task生成一个文件,通常会产生大量的文件(m * r)伴随着大量的随机磁盘IO与大量内存的开销。

    为了解决产生大量文件的问题,在0.8.1版本引入Shuffle Consolidate机制(文件合并机制),将Mapper端生成的中间文件进行合并的处理机制,减少中间文件的生成数量。即为每个reduce task生成一个文件。

  2)基于sort的shuffle

    基于hash的shuffle往往依赖于reduce的个数,不容易空值,在1.1版本引入了基于sort的shuffle。在基于sort的shuffle中,不再会为reduce的task生成单独的文件,而是全部写入一个数据文件中,同时会生成索引文件,reduce阶段的task通过索引文件获取相关数据,最终生成的文件为2 * M,M为map阶段的task数。

  3)基于sort的shuffle解决了在hash时产生大量文件的问题,当数据量越来越多,产生的文件是不可控,严重影响了Spark的性能和扩展能力;但基于sort的shuffle并不是完美的,缺点在于在map端排序需要耗费时间,后面出现了Tungsten-sort shuffle,对排序算法进行改进,优化了排序的速度。

 

二、spark运行流程

  1、SC向资源管理器注册并向资源管理器申请运行Executor;

  2、资源管理器分配Excutor,然后资源管理器启动Executor,并发送心跳到资源管理器;

  3、SC构建DAG有向无环图;

  4、将DAG分解为Stage,将stage发送给TaskScheduler;

  5、Excutor向SC申请task, TaskScheduler将task发送给Excutor运行;

  6、SC将应用程序发送给Excutor运行,运行完毕后释放资源

 

三、Spark数据倾斜

  1、预聚合原始数据:在上游数据源进行预聚合,一般读取的是hive中的数据,hive中进行过滤和预聚合;

  2、预处理导致倾斜的key:热key分离,随机加盐;sample采样(或者countByKey)对倾斜key单独进行join;

    注:在spark中,如果某一个RDD只有一个key,在shuffle过程中会默认将此key对应的数据打散,由不同reduce端task进行处理。

  3、提高reduce的并行度(partition):spark.sq;.shuffle.partition 或者 reduceByKey(200)

  4、使用map join

上一篇:springboot+aop+自定义注解,打造通用的全局异常处理和参数校验(注解版)


下一篇:Collections.shuffle()方法