【Spark 深入学习 04】再说Spark底层运行机制

本节内容

· spark底层执行机制

· 细说RDD构建过程

· Job Stage的划分算法

· Task最佳计算位置算法

一、spark底层执行机制

对于Spark底层的运行原理,找到了一副很好的图,先贴上

【Spark 深入学习 04】再说Spark底层运行机制

客户端提交应用后,spark是如何执行的要有一个整体的概念,做到心中有数,先整体把握,才能更好的分模块开垦细节,废话不多说,先来看该图如何更好的理解。

1)提交前的联系

Worker向Master或则ResourceManager汇报自己有哪些资源(内存、CPU、磁盘空间、网络等),Master或则ResourceManager与Worker一直保持心跳

2)应用提交后

Spark通过RDD对分布式的数据进行管理,RDD记录了转换成“spark格式”后的数据分区(记录数据的存储位置)和数据分区对应的操作

· 应用提交后,形成RDD Graph,并且在后台创建DAG对象(spark不仅仅用DAG建模,而且还会执行它,并且里面不是用对象表示,而是用RDD对象之间的关系)

· DAG Scheduler 优先使用pipeline方法,把RDD的transformation压缩,当碰到wide transformation 时,narrow无法和wide pipeline,那DAG scheduler会把前面的transformation定义成一个stage,DAG Scheduler的工作结果就是将RDD产生一组stages

· 将DAG Scheduler产生的stages传送给task scheduler,task scheduler使用集群管理器依次执行task,task被分配到各个work下执行,当所有的task执行完毕,一个stage标记完成,再运行下一个stage,直到整个spark job完成。

简单理解, Spark 把要处理的数据,处理中间结果,和输出结果都定义成 RDD. 这样一个常见的 Spark job 就类似于:

• 从数据源读取数据,把输入生成一个 RDD;

• 通过运算把输入 RDD 转换成另一个RDD;

• 再通过运算把生成的 RDD 转换成另一个RDD,重复需要进行的 RDD 转换操作 (此处省略一千遍);

• 最后运算成结果 RDD,处理结果;

【Spark 深入学习 04】再说Spark底层运行机制

Spark的运行流程: Client提交应用,master找到一个worker启动driver[也可以其他],driver向master请求资源,之后将应用转化为RDD Graph,再由DAGScheduler将RDD Graph转换为stage的DAG提交给TaskScheduler,由TaskScheduler提交任务给executor。

从调度来看,经历了如下调度:application调度 -> Job调度 -> Stage调度 -> Task调度

二、细说RDD构建过程

从前面的学习我们发现 RDD 其实就是数据集,是一组数据被处理到一个阶段的状态。

每一个 Spark Job 就是定义了由输入 RDD,如何把它转化成下一个状态,再下一个状态 …… 直到转化成我们的输出。这些转化就是对 RDD 里每一个 data record 的操作。用个高大上点的语言,一个 Spark job 就是一系列的 RDD 以及他们之间的转换关系。那么用户如何才能定义 RDD 和转换关系呢?换句话说,用户如何使用 Spark 呢?

用户需要定义一个包含主函数的 Java (main) 类。在这个 main 函数中,无论业务逻辑多么复杂,无论你需要使用多少 Java 类,如果从 Spark 的角度简化你的程序,那么其实就是:

• 首先生成 JavaSparkContext 类的对象.

• 从 JavaSparkContext 类的对象里产生第一个输入RDD. 以读取 HDFS 作为数据源为例,调用 JavaSparkContext.textFile() 就生成第一个 RDD.

• 每个 RDD 都定义了一些标准的常用的变化,比如我们上面提到的 map, filter, reduceByKey …… 这些变化在 Spark 里叫做 transformation.

• 之后可以按照业务逻辑,调用这些函数。这些函数返回的也是 RDD, 然后继续调用,产生新的RDD …… 循环往复,构建你的 RDD 关系图。

• 注意 RDD 还定义了其他一些函数,比如 collect, count, saveAsTextFile 等等,他们的返回值不是 RDD. 这些函数在 Spark 里叫做 actions, 他们通常作为 job 的结尾处理。

• 用户调用 actions 产生输出结果,Job 结束。

Action 都是类似于 “数数这个 RDD 里有几个 data record”, 或者 ”把这个 RDD 存入一个文件” 等等。想想他们作为结尾其实非常合理:我们使用 Spark 总是来实现业务逻辑的吧?处理得出的结果自然需要写入文件,或者存入数据库,或者数数有多少元素,或者其他一些统计什么的。所以 Spark 要求只有用户使用了一个 action,一个 job 才算结束。当然,一个 job 可以有多个 action,比如我们的数据既要存入文件,我们又期望知道有多少个元素。

这些 RDD 组成的关系在 Spark 里叫做 DAG,就是有向无循环图,图论里的一个概念,大家有兴趣可以专门翻翻这个概念。可以发现,实践中绝大部分业务逻辑都可以用 DAG 表示,所以 spark 把 job 定义成 DAG 也就不足为奇了。

RDD 的两种变化

我们上面刚刚介绍了 transformation 的概念。在 Spark 眼中,transformation 被分成 narrow transformation 和 wide transformation. 这是什么东西呢?

上文提到过 RDD 被分成几个分区,分散在多台机器上。当我们把一个 RDD A 转化成下一个 RDD B 时,这里有两种情况:

· 有时候只需要一个 A 里面的一个分区,就可以产生 B 里的一个分区了,比如 map 的例子:A 和 B 之间每个分区是一一对应的关系,这就是 narrow transofmration.

· 还有一类 transformation,可能需要 A 里面所有的分区,才能产生 B 里的一个分区,比如 reduceByKey的例子,这就是 wide transformation.

Narrow 或者 Wide 有什么关系吗?

一个 Spark job 中可能需要连续地调用 transformation, 比如先 map,后 filter,然后再 map …… 那这些 RDD 的变化用图表示就是:

【Spark 深入学习 04】再说Spark底层运行机制

我们可以大胆设想一下,如果每个分区里的数据就待在那台机器的内存里,我们逐一的调用 map, filter, map 函数到这些分区里,Job 就很好的完成。

更重要的是,由于数据没有转移到别的机器,我们避免了 Network IO 或者 Disk IO. 唯一的任务就是把 map / filter 的运行环境搬到这些机器上运行,这对现代计算机来说,overhead 几乎可以忽略不计。

这种把多个操作合并到一起,在数据上一口气运行的方法在 Spark 里叫 pipeline (其实 pipeline 被广泛应用的很多领域,比如 CPU)。这时候不同就出现了:只有 narrow transformation 才可以进行 pipleline 操作。对于 wide transformation, RDD 转换需要很多分区运算,包括数据在机器间搬动,所以失去了 pipeline 的前提。

RDD 的执行

当用户调用 actions 函数时,Spark 会在后台创建出一个 DAG. 就是说 Spark 不仅用 DAG 建模,而且真正地创建出一个 DAG, 然后执行它(顺便说一句 DAG 在 Spark 里不是用一个对象表示的,而是用 RDD 对象之间的关系)。

【Spark 深入学习 04】再说Spark底层运行机制

Spark 会把这个 DAG 交给一个叫 DAG scheduler 的模块,DAG scheduler 会优先使用 pipeline 方法,把 RDD 的 transformation 压缩;当我们遇到 wide transformation 时,由于之前的 narrow transformation 无法和 wide transformation pipeline, 那 DAG scheduler 会把前面的 transformation 定义成一个 stage.

重要的事情说三遍:DAG scheduler 会分析 Spark Job 所有的 transformation, 用 wide transformation 作为边界,把所有 transformation 分成若干个stages. 一个 stage 里的一个分区就被 Spark 叫做一个task. 所以一个 task 是一个分区的数据和数据上面的操作,这些操作可能包括一个 transformation,也可能是多个,但一定是 narrow transformation.

DAG scheduler 工作的结果就是产生一组 stages. 这组 stages 被传到 Spark 的另一个组件 task scheduler, task scheduler 会使用集群管理器依次执行 task, 当所有的 task 执行完毕,一个 stage 标记完成;再运行下一个 stage …… 直到整个 Spark job 完成。

【Spark 深入学习 04】再说Spark底层运行机制

三、Job Stage的划分算法

从前文了解到的处理流程,RDD Graph->DAG Scheduler->Task Scheduler,DAG Scheduler将RDD转换为Job Stage。

由于Spark的算子构建一般都是链式的,这就涉及了要如何进行这些链式计算,Spark的策略是对这些算子,先划分Stage,然后在进行计算。

由于数据是分布式的存储在各个节点上的,所以为了减少网络传输的开销,就必须最大化的追求数据本地性,所谓的数据本地性是指,在计算时,数据本身已经在内存中或者利用已有缓存无需计算的方式获取数据。

1.  Stage划分算法思想

(1)一个Job由多个Stage构成

一个Job可以有一个或者多个Stage,Stage划分的依据就是宽依赖,产生宽依赖的算子:reduceByKey、groupByKey等等

(2)根据依赖关系,从前往后依次执行多个Stage

SparkApplication 中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是有一个或者多个Stage构成,后面的Stage依赖前面的Stage,也就是说只有前面的Stage计算完后,后面的Stage才会运行。

(3)Stage的执行时Lazy级别的

所有的Stage会形成一个DAG(有向无环图),由于RDD的Lazy特性,导致Stage也是Lazy级别的,只有遇到了Action才会真正发生作业的执行,在Action之前,Spark框架只是将要进行的计算记录下来,并没有真的执行。Action导致作业执行的代码如下:触发作业,发送消息。消息的接收和处理:

(1)DAGScheduler启动一个线程EventLoop(消息循环器),不断地从消息队列中取消息。消息是通过EventLoop的put方法放入消息队列,当EventLoop拿到消息后会回调DAGScheduler的OnReceive,进而调用doOnReceive方法进行处理。

为什么要开辟线程来执行消息的读、取?这样可以提交更多的Job,异步处理多Job,处理的业务逻辑一致(调用自己方法也是发送消息),解耦合,扩展性好。

(2)在doOnReceive中通过模式匹配的方式把JobSubmitted封装的内容路由到handleJobSubmitted。

(3)在handleJobSubmitted中首先创建finalStage。

(4)通过递归的方式创建DAG。

四、Task最佳计算位置算法

1.Task任务本算法运用场景

在上一节,我们介绍了Job Stage划分算法,并最终得到了DAG图中的Result Stage(final Stage)。接下来我们通过查看Task任务本地性(为了保证Data Locality)的运用场景----Task的运行调度处理,来引入Task任务本地性算法。

在得到逻辑上Result Stage,Spark为了进行计算就必须先报任务以一定的集群可识别形式提交给集群进行计算。Spark的任务提交过程如下:

(1)生成ActiveJob,为提交finalStage做准备。

(2)提交finalStage

提交Stage,如果有未提交的ParentStage,则会递归提交这些ParentStage,只有所有ParentStage都计算完了,才能提交当前Stag

(3)提交MissingTask

missingTask会最先会再到需要计算的分片,然后对Stage的运行环境进行设定,然后取得Task计算的本地性级别,最后会根据这些信息建立Tasks来处理每个分片,在提交给底层TaskScheduler之前,Spark还会将Tasks封装成TaskSet。最后提交TaskSet给TaskScheduler,等待TaskScheduler最终向集群提交这些Task,并且DAGScheduler会监听这些Task的状态。

2.数据本地性

(1)这里我们来着重讲解获取数据本地性部分的代码:

【Spark 深入学习 04】再说Spark底层运行机制

这里会将要计算的分片(Partition)转换为(id, getPreferredLocs(stage.rdd, id)) 类型的truple,进而由truple转换未一个Map映射,在Task构造时需要一个locs参数,便可以利用这个映射由id得到相应Partition的本地性级别。

在每个分片(Partition)内部则是通过getPreferredLocs方法得到的

在具体算法实现的时候,首先查询DAGScheduler的内存数据结构中是否存在当前partition的数据本地性信息,若有的话就直接放回该信息;若没有首先会调用rdd.getPreferredLocations来得到数据的本地性。

例如想让Spark运行在Hbase上或者是一种现在Spark还没有直接支持的数据库上,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferredLocations方法,来支持各种来源的数据。

DAGScheduler计算数据本地性时,巧妙的借助了RDD自身的getPreferredLocations中的数据,最大化的优化效率,因为getPreferredLocations中表明了每个Partition的数据本地性。虽然当然Partition可能被persist或checkpoint,但是persist或checkpoint默认情况下肯定和getPreferredLocations中的partition的数据本地性是一致的。所以,这中算法就极大的简化了Task数据本地性算法的实现,并且优化了效率

五、参考资料

1.http://mp.weixin.qq.com/s/nDRt1VQTYmsYcW98q4cvEQ-五分钟深入 Spark 运行机制

2.http://blog.csdn.net/sinat_25306771/article/details/51429984

上一篇:Session Tracking Approaches


下一篇:jsp action中附件下载的写法