大家元旦快乐,牛年发发发~~~~牛气冲天o(* ̄︶ ̄*)o
spark粗略流程简述
(1)有算子触发Action,Driver端和hdfs的namenode进行通信,询问元数据信息。根据元数据信息 及相应切分规则切分任务切片,计划分区(task),并向Master申请相应的资源
(2)Master收到Driver的交互信息,并根据自己所管理的Worker节点,决定在哪个Worker上启动Executor(手残上面图中executor写掉了个e)
(3)在Worker启动了本次应用所需要的Executor之后,Executor会向Driver端反向注册,告诉Driver我准备好了,我可以运行你的任务
(4)Driver在准备工作做完了之后,生成相应的task,并由task schedule 任务调度器来决定哪个task发送到哪个executor上执行
(5)executor收到相应的task,开始执行任务
(6)任务执行完成之后,输出到hdfs端,或收集到driver端进行再次操作统计
解析点:
1)Master负责资源的分配,可以决定在哪个Worker上启动Executor
2)Driver(准确来说是task schedule)负责任务的调度,可以决定这个任务给发送到哪个executor上运行
3)程序的前期大部分工作在Driver中进行,真正执行代码逻辑的在Executor中
4)一个Application在一个Worker上只能启动一个Executor,但不同应用在一个Worker上可以启动多个Executor
5)一个Executor中可以并行运行多个task,实际上来说,是一个Executor中有线程池,一个task就是一个线程,这个线程负责来执行task中的代码运算逻辑
名词解释
master
1)负责整个集群Worker的管理(接收worker的注册信息和心跳、移除异常的worker)
2)负责接收提交的任务
3)负责资源调度
4)是一个java进程
5)命令worker启动executor
worker
1)负责管理所在节点的资源
2)向master注册信息,并定期发送心跳报活
3)负责启动executor
4)监控executor的状态
5)是一个java进程
executor
1) 向Driver反向注册
2)负责接收Driver端生成的task,并执行task,将其放入到线程池中运行
3)是一个java进程
Driver
1)Driver(SparkContext),负责将用户编写的代码转化为tasks,然后调度到(实际上是driver里面的task schedule)executor中执行
注:我们写好的代码,并不是立即执行,而是一个计划,这是由Driver负责的,就好比Driver是旅游出行的计划者,做好了旅游攻略,但还没有执行(出行旅游),而真正执行任务、干活或者说做出行旅游动作的是我们的executor
2)接收executor端监控的执行的task的状态和进度信息
Application
1)使用SparkSubmit提交的计算机应用
2)一个Application中可以触发一到多次Action,触发一次Action形成一个DAG,一个DAG对应着一个Job
3)一个Application中可以有1到多个DAG(Job)
图解:
linux01 作为master、worker节点
linux02 、linux03 作为worker节点
到spark 展示ui上查看相关信息 linux:01:8080
可以看到master所管worker共有三个,它们可以分配的内存及核数(这两个可以在spark目录下的conf 中设置内存和核数)此时正在运行的Application为0个,接下来我们启动spark-shell进行编程初体验
在bin目录下./spark-shell --master 是指定master所在的机器位置 后面可以再跟上 --executor-memory xxxm --total-executor-cores x 指定每个executor可分配内存多少m,可分配核数多少个,这里我没有指定,所以是默认 如果--master也不指定,则启动模式为本地模式 指定master,为集群提交模式
--master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口
--executor-memory 指定每一个executor的使用的内存大小
--total-executor-cores指定整个application总共使用了cores core可以认为是线程数
可以清晰的看到,起了一个名为spark shell的Application,而且在worker那也可以看到 内存占用了一个g,核则占用了全部的,也就是说如果不指定每个executor可分配的内存和核数,则默认值为1个g和全部的核数。
先来个编程练练手
回车,再看spark ui展示界面
可以看到已完成的job 数量为1 个,stage为1个,task数量为12个
task schedule
task schedule是driver中的一个任务调度器。它负责将task调度到worker下的executor进程,然后丢入到executor的线程池中进行执行
task
1)spark中任务最小的执行单元
2)将多个task调度到多个executor(一个application在一个worker下只能启一个executor,但不同application在一个worker下可以启多个executor)中分配多个线程来执行task,这样task 就可以 多进程(多个机器下的多个executor)、多线程(一个executor可以使用线程池分配多个线程来执行task)并行的执行task,极大提高spark的运算效率
3)task分为两种
- ShuffleMapTask
(I)专门为shuffle做准备
(II)可以读取各种数据源的数据
(III)也可以读取shuffle后的数据
- ResultTask
(I)可以读取各种数据源的数据
(II)也可以读取shuffle后的数据
(III)专门为了产生计算的结果数据
4)task其实就是一个java对象的实例
- 有属性----------从哪里读数据,读取哪些文件等
- 有方法----------具体如何计算(调用哪些方法或算子,传入了什么函数)
5)task的数量决定着并行度,但task过多也不太行,同时要兼顾考虑到可供分配使用的core
RDD
概念:RDD(Resilient Distributed DataSet)弹性可恢复可容错的、分布式的抽象数据集。为什么说它是抽象的呢?因为这个数据集它本身并不装载数据,而是装载着对这些数据的描述信息,比如以后你要从哪里去读数据,你对RDD做了什么操作(调用了什么方法,传入了什么函数),一旦触发Action,就会生成一个Job,形成一个完整的DAG(有向无环图)。我们只需要对RDD进行编程即可,而无需去关心底层的细节,只要关心具体的计算逻辑即可。
生成RDD的方式:
- 将Driver端集合并行化成RDD 如下图所示
2.读取hdfs的文件 如下图所示
特点:
- 有多个分区(task),分区编号从0开始,分区的数量决定着对应阶段任务的并行度
- 从hdfs中读取
1)分区的数量由hdfs中的数据的输入切片决定
2)sc.textFile()可以指定分区的数量
3)sc.textFile最小的分区数量为2
4)如果一个大文件,一个小文件,大文件大于小文件的1.1倍,大文件会有2个输入切片
5)当分区的数量大于切片的数量,多个Task可以读取一个输入切片;当分区的数量小于切片的而数量,RDD分区的数量由切切数量决定
3. RDD于RDD之间存在着依赖关系
1)RDD调用Transformation后会生成一个新的RDD,子RDD会继承父RDD的依赖关系,关系分为窄依赖关系(没有shuffle)和宽依赖关系(有shuffle)
2)可以根据依赖关系来划分stage
3)可以根据依赖关系来恢复失败的task
4. 如果要发生shuffle,需要使用分区器,如果没有指定分区器,默认使用hashPartitioner,分区器决定着数据到下游的哪个分区内
5. 有一个函数作用在每个输入切片上,每一个分区都会生成一个Task,对该分区的数据进行计算,这个函数就是具体的计算逻辑
6. 大数据行业中不成俗的约定:宁愿移动计算,也不移动数据。如果从hdfs中读取数据,会有一个最优位置:spark在调度任务之前ui读取namenode的元数据信息,获取数据的位置,移动计算而不是数据,这样可以提高计算效率。
7. RDD本身并不装载数据,RDD中保存的是对数据的描述信息(以后去哪读数据,该怎么计算等等),对RDD进行操作,相当于是现在Driver端记录下计算的描述信息,将来生成task,调度到executor端才真正执行计算的逻辑。
RDD的算子分类:
- Transformation算子:即转换算子。调用转换算子会生成一个新的RDD,绝大部分Transformation是lazy的,并不会立即执行,不会触发job,但也有例外,比如sortBy算子,它是一个transformation算子,但是却会触发一次Action,因为它底层进行了采样,collect,触发了一次job,常见的transformation算子有:map、mapValues、flatMap、flatMapValues、keys、values、filter、mapPartitions、union、sort/sortBy、repartition、partitionBy、group/groupByKey、reduceByKey、combinByKey、aggregateByKey、foldByKey、distinct、cogroup、join、leftOuterJoin、rightOuterJoin、fullOuterJoin、subtract
- Action算子:底层会调用sc.runJob方法,会根据最后一个RDD从后往前推,直到推到没有父RDD的RDD为止。触发Action就会就会生成一个Job,就会生成一个DAG(有向无环图),切分stage,生成TaskSet。常见的action算子有:count、saveAsTextFile、collect、aggregate、reduce、min、max、fold、sum、take、first、takeOrdered、top、foreach、foreachPartition、foreachPartitionAsync
Job
1)Driver向executor提交的作业
2)触发一次形成一个完整的DAG
3)一个DAG对应一个Job
4)一个Job里面有一到多个stage,一个stage对应一个TaskSet,一个TaskSet中有一到多个Task
DAG
1)有向无环图
2)RDD与RDD之间依赖关系的描述
3)触发一个Action就会形成一个完整的DAG
4)一个DAG中有1到多个stage,stage的个数等于shuffle的次数+1,一个stage就是一个taskset,一个taskset里面有1到多个task
5)stage可以分为两种,ShuffleMapStage和ResultStage;ShuffleMapStage对应的TaskSet里面自然就是ShuffleMapTask,ResultStage对应的TaskSet里面自然就是ResultTask
TaskSet
1)保存着同一种计算逻辑的task的集合
2)一个TaskSet中的计算逻辑都是一样的,但计算的数据不一样
Stage
1)任务执行的阶段
2)Stage执行是有先后顺序的,先执行前面的stage,后执行后面的stage
3)1个stage就是1个taskset
4)1个DAG中可以有1到多个stage
5)一个taskset中的task的数量取决于这个stage中最后一个rdd的分区的数量
dependency
1)依赖关系,指父RDD与子RDD之间的依赖关系
2)窄依赖:没有shuffle的产生,多个算子可以被优化合并到一个task之后,即在一个pipeline(管道)中
3)宽依赖:有shuffle产生,是划分stage的依据
shuffle
1)需要通过网络将数据传输到多台机器上,数据被打散,就叫做shuffle;但由网络传输,不一定就有shuffle
2)上游RDD的一个分区中的数据给了下游RDD的多个分区,即是shuffle;注意,实际上是下游的task到上游拉取数据,并不是上游task主动发给下游的
DAGScheduler
DAGScheduler是将DAG根据宽依赖切分Stage,负责划分调度阶段并将Stage转成TaskSet提交给TaskScheduler
TaskScheduler
TaskScheduler是将Task调度到Worker下的Exexcutor进程,然后丢入到Executor的线程池的中进行执行
更多学习、面试资料尽在微信公众号:Hadoop大数据开发