一、RDD依赖
一、为什么要设计宽窄依赖
- 窄依赖
- Spakr可以并行计算
- 如果有一个分区数据丢失,主需要从父RDD的对应1个分区重新计算即可,不需要重新计算整个任务,提高容错
- 宽依赖
- 宽依赖是划分Stage的依据
- 构建Lineage血缘关系
- RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列Lineage记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区
二、窄依赖
-
窄依赖中,父RDD和子RDD间的分区是一对一的。换句话说父RDD中,一个分区内的数据是不能被分割的,只能由RDD中的一个分区整个利用。
-
上图中P代表RDD中的每个分区,我们看到,RDD中每个分区内的数据在上面的几种转移操作之后被一个分区所使用,即其依赖的父分区只有一个。比如图中的map、union和join操作,都是窄依赖的。
注意:join操作比较特殊,可能同时存在宽、窄依赖
三、Shuffle依赖(宽依赖)
-
Shuffle依赖一会打乱原RDD结构的操作。具体来说,父RDD中的分区可能会被多个子RDD分区使用。因为父RDD中一个分区内的数据会被分割并发送给子RDD的所有分区,因此Shuffle依赖也意味着父RDD与子RDD之间存在着Shuffle过程
-
上图中P代表RDD中对的多个分区,我么会发现对于Shuffle类操作而言,结果RDD中的每个分区可能会依赖多个父RDD中的分区。需要说明的是,依赖关系是RDD到RDD之间的一种映射关系,是两个RDD之间的依赖,如果在一次操作中设计多个父RDD,也有可能同时包含窄依赖和Shuffle依赖
四、如何区分宽窄依赖
区分RDD之间的依赖为宽依赖还是窄依赖,主要在于父RDD分区数据与子RDD分区数据关系
- 窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
- 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖,设计Shuffle
二、DAG和Stage
一、什么是DAG
- 在图论中,如果一个有向图无法从任意顶点触发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。而在Spark中,由于计算过程很多时候会有先后顺序,受制于某些任务必须比另一些任务较早执行的限制,必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是DAG图,每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系)
二、DAG如何划分Stage?
-
Spark中DAG生成过程的重点是对Stage的划分,其划分的依据是RDD的依赖关系,对于不同的依赖关系,高层调度器会进行不同的处理
- 对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage
- 对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分
-
在Spark中,DAG生成的流程关键在于回溯,在程序提交后,高层调度器将所有的RDD看成是一个Stage,然后对此Stage进行从后往前的回溯,遇到Shuffle就断开,遇到窄依赖,则归并到同一个Stage。的能到所有的步骤回溯完成,便生成一个DAG图
-
为什么要划分Stage?–并行计算
- 一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖进行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行
- pipeline:HDFS----textRDD----splitRDD----tupleRDD
三、Spark Shuffle
一、Spark的Shuffle简介
-
Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化和反序列化、跨节点网络IO以及磁盘读写IO等
-
Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步
-
执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。
四、HashShuffle详解
一、Shuffle阶段划分:
- shuffle write:mapper阶段,上一个stage得到最后的结果写出
- shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
二、未经优化的hashShuffleManager:
- HashShuffle是根据task的计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分,这样保证相同的数据一定放入一个分区,Hash Shuffle过程如下:
-
根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,再将block文件进行合并。
-
未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。
三、经过优化的hashShuffleManager:
- 在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,每一个Group磁盘文件的数量与下游stage的task数量是相同的。
四、数量对比
- 未经优化:
- 上游的task数量:m
- 下游的task数量:n
- 上游的executor数量:k (m>=k)
- 总共的磁盘文件:m*n
- 优化之后的:
- 上游的task数量:m
- 下游的task数量:n
- 上游的executor数量:k (m>=k)
- 总共的磁盘文件:k*n
五、SortShuffleManager详解
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
一、SortShuffle的普通机制
- 该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
- 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
- 排序
- 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
- 溢写
- 排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
- merge
- 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程。
- 由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
二、Sort shuffle的bypass机制
- bypass运行机制的触发条件如下:
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
- 不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
- 此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
- 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
- 而该机制与普通SortShuffleManager运行机制的不同在于:
- 第一,磁盘写机制不同;
- 第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
三、总结
- SortShuffle也分为普通机制和bypass机制
- 普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。
- 而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。
六、Spark Shuffle的配置选项(配置调优)
一、spark 的shuffle调优
主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等
二、spark.shuffle.file.buffer
-
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
-
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
三、spark.reducer.maxSizeInFlight:
-
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)
-
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
四、spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait:
-
spark.shuffle.io.maxRetries :shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)
-
spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
-
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
五、spark.shuffle.memoryFraction:
- 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作内存比例。
六、spark.shuffle.manager
- 参数说明:该参数用于设置shufflemanager的类型(默认为sort).Spark1.5x以后有三个可选项:
- Hash:spark1.x版本的默认值,HashShuffleManager
- Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
七、spark.shuffle.sort.bypassMergeThreshold
-
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
-
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
七、job调度流程
-
Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行
-
DAGScheduler负责Stage级的调度,主要是将DAG切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度
-
TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照规定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不通的资源管理系统
-
-
Spark的任务调度总提来说分两路进行,一路是Stage级的调度,一路是Task级的调度
-
一个Spark应用程序包括Job、Stage及Task:
- 第一:Job是以Action方法为界,遇到一个Action方法则触发一个Job;
- 第二:Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
- 第三:Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task
八、Spark并行度
一、资源并行度与数据并行度
- 在Spark Application运行时,并行度可以从两个方面理解:
- 资源的并行度:由节点数(executor)和cpu数(core)决定的
- 数据的并行度:task的数据,partition大小
- task又分为map时的task和reduce(shuffle)时的task;
- task的数目和很多因素有关,资源的总core数,spark.default.parallelism参数,spark.sql.shuffle.partitions参数,读取数据源的类型,shuffle方法的第二个参数,repartition的数目等等。
- 如果Task的数量多,能用的资源也多,那么并行度自然就好。如果Task的数据少,资源很多,有一定的浪费,但是也还好。如果Task数目很多,但是资源少,那么会执行完一批,再执行下一批。所以官方给出的建议是,这个Task数目要是core总数的2-3倍为佳。如果core有多少Task就有多少,那么有些比较快的task执行完了,一些资源就会处于等待的状态。
二、设置Task数量
将Task数量设置成与Application总CPU Core 数量相同(理想情况,150个core,分配150 Task)官方推荐,Task数量,设置成Application总CPU Core数量的2~3倍(150个cpu core,设置task数量为300~500)与理想情况不同的是:有些Task会运行快一点,比如50s就完了,有些Task可能会慢一点,要一分半才运行完,所以如果你的Task数量,刚好设置的跟CPU Core数量相同,也可能会导致资源的浪费,比如150 Task,10个先运行完了,剩余140个还在运行,但是这个时候,就有10个CPU Core空闲出来了,导致浪费。如果设置2~3倍,那么一个Task运行完以后,另外一个Task马上补上来,尽量让CPU Core不要空闲。
三、设置Application的并行度
参数spark.defalut.parallelism默认是没有值的,如果设置了值,是在shuffle的过程才会起作用
if __name__ == '__main__':
print('PySpark First Program')
# 输入数据
data = ["hello", "world", "hello", "world"]
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
conf.set("spark.defalut.parallelism", 4)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# sc = SparkContext.getOrCreate(conf)
sc = SparkContext(conf=conf)
# 将collection的data转为spark中的rdd并进行操作
rdd = sc.parallelize(data)
# rdd = sc.textFile("file:///export/pyfolder1/pyspark-chapter02_3.8/data/word.txt") \
# .flatMap(lambda line: line.split(" "))
print("rdd numpartitions:", rdd.getNumPartitions())
# 执行map转化操作以及reduceByKey的聚合操作
res_rdd = rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 并行度决定了可以同时处理多少个分区
print("shuffle numpartitions:", res_rdd.getNumPartitions())
print('停止 PySpark SparkSession 对象')
sc.stop()
九、Spark中的CombineByKey
combineByKey是Spark中一个比较核心的高级且底层函数,其他一些高阶键值对函数底层都是用它实现的。诸如 groupByKey,reduceByKey等等
如下解释下3个重要的函数参数:
- createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
- mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
- mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)。
案例一:实现将相同Key的Value进行合并,使用groupBy很容易实现
# -*- coding: utf-8 -*-
# Program function:外部集合转为RDD
from pyspark import SparkConf, SparkContext
import re
# 1-准备环境
conf = SparkConf().setAppName("collection").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
return [a]
def append(a, b):
a.append(b)
return a
def extend(a, b):
a.extend(b)
return a
print(sorted(x.combineByKey(to_list, append, extend).collect()))
#[('a', [1, 2]), ('b', [1])]
- 作用
- 对数据集按照 Key 进行聚合
- 调用
- combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
- 参数
- createCombiner 将 Value 进行初步转换
- mergeValue 在每个分区把上一步转换的结果聚合
- mergeCombiners 在所有分区上把每个分区的聚合结果聚合
- partitioner 可选, 分区函数
- mapSideCombiner 可选, 是否在 Map 端 Combine
- serializer 序列化器
- 注意点
- combineByKey 的要点就是三个函数的意义要理解
- groupByKey, reduceByKey 的底层都是 combineByKey
案例二:求平均分的案例代码
# -*- coding: utf-8 -*-
# Program function:外部集合转为RDD
from pyspark import SparkConf, SparkContext
import re
# 1-准备环境
conf = SparkConf().setAppName("collection").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)])
# (v)=>(v,1),得到的是(88,1),因为这是combineByKey是按照key处理value操作,
# acc:(Int,Int)代表的是(88,1),其中acc._1代表的是88,acc._2代表1值,v代表是同为Fred名称的95的数值,
# 所以acc._1+v=88+95,即相同Key的Value相加结果,第三个参数是分区间的相同key的value进行累加,
# 得到Fred的88+95+91,Wilma累加和为93+95+98。
def createCombiner(a):
return [a, 1]
def mergeValue(a, b):
return [a[0] + b, a[1] + 1]
def mergeCombiners(a, b):
return [a[0] + b[0], a[1] + b[1]]
resultKey = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(sorted(resultKey.collect()))
# [('Fred', [274, 3]), ('Wilma', [286, 3])]
print(resultKey.map(lambda score: (score[0], int(score[1][0]) / int(score[1][1]))).collect())
# [('Fred', 91.33333333333333), ('Wilma', 95.33333333333333)]
#lambda表达式版本
resultKey = x.combineByKey(lambda x:[x,1], lambda x,y:[x[0]+y,x[1]+1], lambda x,y:[x[0]+y[0],x[1]+y[1]])
print(sorted(resultKey.collect()))