1.Spark内核
1.1核心组件
Driver在Spark作业执行时主要负责:
- 将用户程序转化为作业(Job);
- 在Executor之间调度任务(Task);
- 跟踪Executor的执行情况;
- 通过UI展示查询运行情况;
Executor对象是负责在Spark作业中运行具体任务
- 负责运行组成Spark应用的任务
- 要求缓存的 RDD 提供内存式存储
1.2Spark通用运行流程概述
- 任务提交后,都会先启动Driver程序;
- 随后Driver向集群管理器注册应用程序;
- 之后集群管理器根据此任务的配置文件分配Executor并启动;
- Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,
Taskset中有多个Task,查找可用资源Executor进行调度; - 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。
1.3Standalone模式
Standalone集群有2个重要组成部分,分别是:
1) Master(RM):是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;
2) Worker(NM):是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某个或某些partition;
另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算。
- 在Standalone Cluster模式下,任务提交后,Master会找到一个Worker启动Driver。
- Driver启动后向Master注册应用程序,Master在Worker启动Executor
- Worker上的Executor启动后会向Driver反向注册,
- 所有的Executor注册完成后,Driver开始执行main函数,之后执行到Action算子时,开始划分Stage,每个Stage生成对应的taskSet,之后将Task分发到各个Executor上执行。
在Standalone Client模式下,Driver在任务提交的本地机器上运行
1.4YARN调度
资源调度和分配交给了YARN来处理
YARN-CLENT
YARN-CLUSTER
2.Spark通讯架构
相关知识
-
BIO(Blocking I/O):阻塞式IO
假设去饭店吃饭:老板在给前面先来的人做饭,自己就找个位置坐下等着
-
NIO(New I/O):非阻塞式IO
老板在给前面先来的人做饭,自己去干别的事情,过一段时间来询问老板饭是否做好。干别的事情不安宁,总要记着这个事情。
-
AIO(Asynchronous I/O):异步非阻塞式IO
老板在给前面先来的人做饭,和老板约定好什么时候给我饭,专心干别的事情
Spark基于Netty通信
Driver于Exceutor通信的方式,发件箱和收件箱,发件箱与服务通信
3. Spark任务调度机制
Driver线程主要是初始化SparkContext对象,准备运行所需的上下文
-
一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,
-
另一方面调度任务,将任务下发到Executor上。
资源调度与任务分配
- 当ResourceManager向ApplicationMaster返回Container资源时
- ApplicationMaster就尝试在对应的Container上启动Executor进程,
- Executor进程起来后,会向Driver反向注册,
- 注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。
3.1Spark任务调度概述
Job、Stage以及Task
- 遇到一个Action方法则触发一个Job
- Stage以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分
- 一个Stage对应一个TaskSet
Spark RDD通过其Transactions操作,形成了RDD血缘(依赖)关系图,即DAG,最后通过Action的调用,触发Job并调度执行,
执行过程中会创建两个调度器:DAGScheduler和TaskScheduler。
- DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
- TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet,分发到Executor上执行
Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler
3.2 Spark Stage级调度
当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,根据DAG进行切分,将一个Job划分为若干Stages
- 划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,
- 一类叫做ShuffleMapStage,为下游Stage准备数据
错误重试:
只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。
3.3Spark Task级调度
TaskScheduler会将TaskSet封装为TaskSetManager
TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。
3.3.1调度策略
一种是FIFO:将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager
一种是FAIR:TaskSetMagager进行排序,要排序的TaskSetMagager对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,
综合考量三值进行排序
TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task。
Spark调度总是会尽量让每个task以最高的本地性级别来启动
- 同一个Executor
- 同一个节点
- 同一个机架的两个节点上
3.2 失败重试
- Task被提交到Executor启动执行后,
- Executor会将执行状态上报给SchedulerBackend,
- SchedulerBackend则告诉TaskScheduler,TaskScheduler
- 到该Task对应的TaskSetManager,并通知到该TaskSetManager
TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数
在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上
4. Spark Shuffle解析
1.ShuffleMapStage的结束伴随着shuffle文件的写磁盘。
2.ResultStage对应action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束
4.1HashShuffle
1.未优化的
Task 开始那边各自进行 Hash 计算,每个task得到3个分类
2.优化的
启用合并机制,合并机制就是复用buffer
在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里
然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据)
4.2SortShuffle
1.普通SortShuffle
- 在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中
- 也就是说一个Task过程会产生多个临时文件
- 最后在每个Task中,将所有的临时文件合并,这就是merge过程
- 此过程将所有临时文件读取出来,一次写入到最终文件
意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
2.bypass SortShuffle
触发条件
- shuffle reduce task数量小于触发参数的阈值
- 不是聚合类的shuffle算子
此时task会为每个reduce端的task都创建一个临时磁盘文件
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已
而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销
5.Spark内存管理
5.1堆内内存和堆外内存
堆内内存受到JVM统一管理,
堆外内存是直接向操作系统进行内存的申请和释放。
Spark对堆内内存的规划
-
这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,
-
这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存
-
剩余的部分不做特殊规划
Spark对堆内内存的管理是一种逻辑上的”规划式”的管理,对象实例占用内存的申请和释放都由JVM完成,
Spark只能在申请后和释放前记录这些内存。
申请内存流程如下:
-
Spark 在代码中 new 一个对象实例;
-
JVM 从堆内内存分配空间,创建对象并返回对象引用;
-
Spark 保存该对象的引用,记录该对象占用的内存。
释放内存流程如下:
-
Spark记录该对象释放的内存,删除该对象的引用;
-
等待JVM的垃圾回收机制释放该对象占用的堆内内存。
Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上
可以提升内存的利用率,减少异常的出现。
堆外内存是直接向操作系统进行内存的申请和释放。
为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间
5.2统一内存管理
存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间
Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint 或按照血统重新计算
Driver端的Master负责整个Spark应用程序的Block的元数据信息的管理和维护,而Executor端的Slave需要将Block的更新等状态上报到Master,
同时接收Master 的命令,例如新增或删除一个RDD。
RDD 在缓存到存储内存之后,Partition 被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间
转换为连续存储空间的过程,Spark称之为"展开"(Unroll)。
Spark的存储内存和执行内存有着截然不同的管理方式:
-
对于存储内存来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的 RDD的Partition转化而成;
-
而对于执行内存,Spark用AppendOnlyMap来存储 Shuffle过程中的数据,在Tungsten排序中甚至抽象成为页式内存管理,开辟了全新的
JVM内存管理机制。
2021.11.27 15:43