Spark on k8s 源码解析执行流程

Spark on k8s 源码解析执行流程

1.通过spark-submit脚本提交spark程序
在spark-submit脚本里面执行了SparkSubmit类的main方法
在这里插入图片描述
2.运行SparkSubmit类的main方法,解析spark参数,调用submit方法
在这里插入图片描述
3.在submit方法里调用doRunMain方法,最终调用runMain方法
在这里插入图片描述
4.在runMain方法里面获取mainClass,再去执行start方法
1、准备提交环境
在这里插入图片描述

2、获取类加载器
在这里插入图片描述
3、判断是否是sparkApplication
在这里插入图片描述

4、执行sparkApplication的start方法

在这里插入图片描述
5.在start方法里面去调用KubernetesClientApplication的start方法
在这里插入图片描述
1、获取k8s的集群域名,调用k8s的客户端,传入spark相关配置参数
在这里插入图片描述
6.调用Client的run方法,通过k8s的apiserver创建Container pod 然后在container中创建driver pod
1、创建configmap
在这里插入图片描述

2、创建driver pod
在这里插入图片描述

7 Driver的启动

上面通过KubernetesClientApplication创建driver之后,会在docker的run的时候,触发ENTRYPOINT或者CMD的命令,作为容器运行的主进程。Spark镜像的ENTRYPOINT是/opt/entrypoint.sh,driver模式下里面的内容基本就是把arg参数传递给/bin/spark-submit,然后指定以client模式再次启动一个SparkSubmit进程。

创建driver的command
在这里插入图片描述
执行driver命令的shell文件
在这里插入图片描述
启动SparkSubmit的源码,和上面分析的一样,只不过这次是以client模式提交的,所以不再会调用到org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,而是直接调用–class后面的作业Class的main方法,在我们的例子中就是直接执行org.apache.spark.examples.SparkPi。

SparkContext创建过程,重点是
1、创建了TaskScheduler(taskSet的调度执行)
在这里插入图片描述

在这里插入图片描述
2、DagScheduler(根据宽窄依赖划分Stage)

在 submitStage 方法中,DAGScheduler 会递归地提交任何缺失的父阶段。这段代码展示了如何处理阶段(Stage)及其依赖关系。
在这里插入图片描述

在这个方法中,DAGScheduler 会根据 ShuffleDependency 创建或获取一个 ShuffleMapStage。如果该 ShuffleMapStage 已存在,它会直接返回,否则会创建一个新的 ShuffleMapStage 并将其加入 shuffleIdToMapStage 中。
在这里插入图片描述

3、HeartbeatReceiver(executor向driver定时发送心跳)。

接收并处理来自 Executor 的心跳消息,确保 Executor 的健康状态。如果 Executor 超时未发送心跳信号,HeartbeatReceiver 将标记其为失效并移除。

这些配置参数就是作用于这个类
spark.network.timeout:配置网络超时时间,默认值为 120s。
spark.executor.heartbeatInterval:配置心跳检查的时间间隔,默认值为 10s。
在这里插入图片描述

4、创建executor
(1)初始Executor数量。如果开启了dynamic allocation,会采用max(spark.dynamicAllocation.minExecutors,spark.dynamicAllocation.initialExecutors,spark.executor.instances)的结果。如果默认未开启的情况下,直接取spark.executor.instances数量。如果均未设置,则取值2。
在这里插入图片描述

(2)Executor的label。和Driver一样,会打上spark-app-selector:{appId}、spark-role:executor。除此之外还有一个spark-exec-id。
在这里插入图片描述

在这里插入图片描述

(3)和Driver一样,解析SparkConf里spark.kubernetes.executor开头的配置项,配置一些Env,Annotation,Volume,Secret等。

在这里插入图片描述

(4)还会配置一些默认的环境变量env。
在这里插入图片描述

8 Executor的启动

从entrypoint脚本可以看到,Executor模式下,启动的主类是org.apache.spark.executor.CoarseGrainedExecutorBackend。
在这里插入图片描述
在这里插入图片描述

创建一个和Driver的netty临时网络连接,从Driver拿到sparkProperties。Driver的地址来自于启动参数的–driver-url,对应配置spark.driver.host。
在这里插入图片描述

RpcEndpointRef初始化时,Executor会向Driver发送类型为RegisterExecutor的注册消息。这样Driver就能知道每个Exectuor的地址用于通信了。

随后Executor会等待Driver发送的LaunchTask类型的消息,收到后会反序列化并在线程池中执行具体的Task。

在这里插入图片描述
当CoarseGrainedExecutorBackend收到LaunchTask的命令的时候,会调用executor里面的launchTask方法执行任务。
在这里插入图片描述

Executor的主线程会一直等待,直到Driver发来StopExecutor的消息才会退出。StopExecutor一般来说会在Driver退出或者SparkContext关闭时触发。
在这里插入图片描述

9 总结

这就是spark on k8s模式源码分析的整个流程。

上一篇:什么是 JVM( Java 虚拟机),它在 Java 程序执行中扮演什么角色?


下一篇:[论文笔记] 自对齐指令反翻译:SELF-ALIGNMENT WITH INSTRUCTION BACKTRANSLATION