前言
spark在提交任务到集群后,会有一系列的处理,
从根据任务代码构建DAG,再到根据shuffle算子切分stage,
然后生成taskset提交到集群管理进行调度等等,
都需要一个比较清晰的流程梳理,便于问题定位,也利于性能优化
流程
- 通过测试和观察spark ui,spark会将依赖jar提前上传至hdfs目录,用于后续executor的分发和使用
- 流程机构图【基于 yarn-cluster模式)
流程解析
准备
- 1.spark client通过spark-submit或API,进行任务提交
- 2.RM收到任务提交请求后,进行任务记录,分配起application_id(在yarn web ui对应),并在hdfs目录分配目录用于新任务上传必须资源([hdfs:///user/用户/.sparkStaging/applicationId])
- 3.spark client收到响应后,根据application_id上传任务运行所需依赖到为其分配的hdfs目录,以及spark配置参数,应用代码
- 4.上传依赖后,准备就绪,想RM申请资源运行任务
执行
- 5.RM收到请求,若资源队列存在可分配NM,则向可用NM发送请求,NM接受到请求后,创建container。NM创建container成功后,向RM发送响应
- 6.RM收到响应,提示spark client可以运行任务
- 7.spark client收到RM应答后,发送运行的命令到NM所在的container中,启动application master[AM负责任务的协调]
- 8.application master从hdfs中该application对应的hdfs目录获取上传jar,配置文件,依赖包,创建spark Driver进程
- 9.根据spark集群配置参数,application master向RM申请 NM容器,启动Executor对应的容器
- 10.RM调度空闲、可用的NM创建container
- 11.application master获取NMcontainer后,发送启动Executor进程的命令,Executor启动后向Driver进行反向注册,用于心跳检测和计算结果返回
- 12.Driver进程会解析spark执行,执行main函数,后续DAGScheduler进行一系列DAG构建,然后进行stage划分后,每个stage生成taskset,由taskScheduler提交到cluster manager,最后提交yarn集群
- 13.由于Executor已在Driver注册,因此Driver会将task分配到Executor中执行
- 14.executor分配到对应的任务进行执行,执行完毕向driver端发送结果,driver端根据结果进行聚合;Driver会根据Executor执行情况,会决定是否进行推测执行,对运行较慢的Executor,开启新的Executor执行该任务。
结束
- 15.Executor结束
- 16.Driver结束
- 17.Application结束,Application Master向RM注销
- 18.RM、NM继续接受下一个任务
总结
- 通过流程梳理,我们发现如果spark任务运行失败,检查配置参数是否正确,jar包[业务代码,依赖]是否上传值该Application对应的hdfs目录,进而根据代码进行分析