Spark基础
一.spark介绍
1.1 spark特点
- 1.快速高效 :spark可以将中间结果cache到内存中,节省了网络IO和磁盘IO。同时spark使用了dag任务调度思想,将计算逻辑构成了一个有向无环图,同时也会将dag优化后再生成物理计划,所以性能比mapreduce好很多。
- 2.简洁易用,spark支持scala,java,python,r等语言操作。
- 3.提供了统一的大数据解决方案。Spark还支持SQL,大大降低了大数据开发者的使用门槛,同时提供了SparkStream和Structed Streaming可以处理实时流数据;MLlib机器学习库,提供机器学习相关的统计、分类、回归等领域的多种算法实现。其高度封装的API
接口大大降低了用户的学习成本;Spark
GraghX提供分布式图计算处理能力;PySpark支持Python编写Spark程序;SparkR支持R语言编写Spark程序。 - 4.支持多种部署方案,1.standalone,spark on yarn, spark on mesos
- 5.支持多种数据源,hdfs,hbase,hive,alluxio以及任何和hadoop相兼容的。
1.2 spark跟mapreduce优缺点对比
- 1.mr:mr只能做离线计算,如果实现复杂逻辑,一个mr解决不了,需要将多个mr按照顺序串联计算,然后每一个mr的结果存储在hdfs中,写一个mr将上一个mr的输出结果作为输入,这样就要频繁读写hdfs,网络io和磁盘io是性能瓶颈,从而效能低下。
- 2.spark:既可以做离线计算,也能做实时计算。提供了抽象的数据集(RDD,Dataset,dataframe,DStream),有高度封装的API,算子丰富,同时使用了更先进的DAG有向无环图,可以优化后再执行执行计划,并且数据可以cache再内粗中,
- 3.mr的task是以map task, reduce这样的进程级别实例组成,spark的worker,executor也是进程级别实例,但是分配到具体任务的时候,mr还是进程实例,但是spark处理任务的单位task是运行在executor的线程,是多线程级别。
1.3 3.0版本新特性
1.改进spark sql,在运行时对查询计划优化,允许spark planner在允许时执行这些计划,这些计划将在运行时执行可选的计划。计划将基于运行时统计数据优化,提高性能
● 动态合并shuffle partitions
● 动态调整join策略
● 动态优化倾斜的join
● 动态分区裁剪
● ANSI SQL兼容性
● Join hints
1.4
基本概念:
- 1.application,表示你的应用程序
- 2.driver,表示main()函数,创建spark context。由spark context 负责和cluster manager通信,进行资源的调度。程序执行完毕关闭spark context.
- 3.executor,某个application 运行在worker节点的一个进程,该进程负责运行某些task,负责将数据存在磁盘或者内存中。负责将task包装成task
runner,并从线程池抽一个空闲线程运行task,每一个executor最多能运行的task数量取决于CPU的核数。 - 4.work, 集群中运行application的节点。
- 5.task,每个executor进程中执行任务的工作单元,多个task组成stage。
- 6.stage,又叫task set,由多个task组成
- 7.job,包括多个task set组成的并行计算,是又action行为触发,
- 8.dag scheduler。基于job构建基于stage的dag,并提交stage给task scheduler。靠rdd之间的依赖关系划分stage.
- 9.task scheduler。将stage提交给worker节点运行,每个executor执行什么task就是在这里来。
1.5 spark流程
- 1.构建spark application环境,启动spark context。 spark context 向资源管理器(standalone或者yarn)注册于申请executor资源。
- 2.资源管理器分配executor资源,同时启动standaloneexecutorbackend。executor会将运行状态汇报给资源管理器。
- 3.executor香spark context申请task。
- 4.spark context将程序代码构建dag有向无环图。然后dag scheduler将dag分解为stage,分配stage给task scheduler。
- 5.executor向task scheduler申请 task。
- 6.task scheduler分配task给executor,同时sparkcontext将应用程序代码发给executor。
- 7.Task在Executor上运行,运行完毕释放所有资源。
二、RDD的使用
1.1 什么是RDD
rdd是一个弹性的分布式数据集,是spark里面最基础的抽象。RDD是一个不可变的、有多分区的、可以并行计算的集合。rdd不装真正要计算的数据,只存放着数据的描述信息。例如从哪里读数据,调用了什么方法,传了什么函数,以及依赖关系等等。
1.2 rdd的特点
- 1.rdd之间存在依赖关系。RDD是不可改变的,RDD只能通过transformation生成一个新的rdd,子rdd会记录父RDD的一些依赖关系,包括宽依赖和窄依赖
- 2.rdd存在分区:分区标号一般从0开始,分区的数量决定了对应阶段的task并行度。
- 3.函数作用在每个分区上,每个分区都生成一个task,对该分区进行计算,整个函数就是具体的计算逻辑。
- 4.kvrdd在shuffle的时候会有分区器,默认使用hashpartitioner。
- 5.如果是从hdfs取数据,rdd会从namenode获取数据地址,移动计算,并不是移动数据,可以提高计算效率。
宽依赖和窄依赖
父RDD中的partition和子RDD的partition。如果关系是一对多(父rdd的一个partition数据对应子rdd的多个partition数据),就是宽依赖;父RDD中的partition和子RDD的partition如果是一对一或多对一,就是窄依赖
简单来说,就是,父RDD一个ID分区有1和2两个ID,子RDD只获取了父RDD的1或者2,这就是宽依赖,如果子RDD将1和2都获取了,就是窄依赖
1.3 rdd分类
- 1.transformation 转换算子,调用转换算子生成新的rdd。transformation 是lazy惰性计算,不触发job执行
- 2.action 行动算子,调用行动算子会触发job执行,本质上是调用了sc.runjob()方法。该方法会从最后一个rdd,根据依赖关系,从后往前,划分stage,生成taskset。
1.4 创建rdd方式
- 1.利用parallelize转换集合为
rdd val rdd1 = sc.parallelize(Array(1,2,3,4))
- 2.从指定目录读取文件
val line = sc.textFile("hdfs://192.x.x.x:9000/log")
1.5 常用transformation算子
- 1.map 单元素映射 val rdd1 = sc.parallelize(Array(1,2,3,4)).map(_*2)
- 2.flatmap() 先map映射,再压平 val rdd_fm = sc.parallelize(Array(“a b c”,“d e f”)).flatmap(_.split(’ ')).collect();
- 3.filter 过滤 val rdd_f = sc.parallelize(Array(1,2,3,4)).filter(_%2==0)
- 4.mapPartitions() 效果跟map一样,但是map是针对单个元素,mapPartitions是针对整个分区的。例如一个分区要处理1w条数据,那么map的fun()就要计算1w次,但是mapPartitions是把1w条数据接收了,再计算1次就行。
val rdd_mp = sc.parallelize(Array(1,2,3,4)).mapPartitions(it =>
it.map(x => x*10))
- 5.mapPartitionsWithIndex,类似于mapPartitions, 不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。●
mapPartitionsWithIndex,类似于mapPartitions,
不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。 - 6.sortBy算子,排序
sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x,true)
sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x+"",true)
sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x.toString,true)
7.sortByKey算子,排序
val rdd1 = sc.parallelize(List((“hello”, 9), (“tom”, 8), (“kitty”, 7), (“tom”, 2)))val rdd2 = rdd1.sortByKey() - 8.groupBy算子,按照key分组
val rdd1 = sc.parallelize(List((“hello”, 9), (“tom”, 8), (“kitty”, 7), (“tom”, 2)))val rdd2 = rdd1.groupBy(_._1) - 9.groupByKey 按照key进行 val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd2: RDD[(String,
Iterable[Int])] = rdd1.groupByKey() - 10.reduceByKey val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd3 = rdd1.reduceByKey(+)
- 11.distinct算子,去重 sc.parallelize(List(5,5,6,6,7,8,8,8)).distinct.collect
- 12.union算子,并集,两个RDD类型要一样 val rdd6 = sc.parallelize(List(5,6,4,7), 2)val rdd7 = sc.parallelize(List(1,2,3,4), 3)val rdd8 =
rdd6.union(rdd7)
1.6 常见action 算子
- 1.collect,将数据以数组形式收集回Driver端,数据按照分区编号有序返回。同时会从远程集群是拉取数据到driver端。 val rdd1 = sc.parallelize(List(1,2,3,4,5), 2) rdd1.collect
- 2.reduce,将数据以输入的函数进行聚合返回一个值
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
val r = rdd1.reduce(+)
- 3.count,返回rdd元素的数量
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
var c = rdd1.count
- 4.top将RDD中数据按照降序或者指定的排序规则,返回前n个元素
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
var c: Array[Int] = rdd1.top(2)
- 5.take,返回一个由数据集的前n个元素组成的数组
val rdd1 = sc.parallelize(List(3,2,4,1,5), 2)
var c: Array[Int] = rdd1.take(2)
- 6.saveAsTextFile以文本的形式保存到文件系统中
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1.saveAsTextFile(“hdfs://node-1.51doit.cn:9000/out2”)
1.7 缓存算子 cache、persis
cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,可以将数据缓存到内存或磁盘【executor所在的磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,可以指定更加丰富的存储基本,支持多种StageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,也可以使用其他的序列化方式
cache和persist方法,严格来说,不是Transformation,应为没有生成新的RDD,只是标记当前rdd要cache或persist。
1.8 checkpoint算子 下载中间结果到hdfs
checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,触发多次Action,checkpoint才有意义,多用于迭代计算
1.9 rdd分析
sc.textFile(args(0))
.flatMap(.split(" "))
.map((, 1))
.reduceByKey(+)
.saveAsTextFile(args(1))
- 1.假如读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区都是RDD,所以有两个rdd
- 2.在调用reduceByKey方法时,有shuffle产生,要划分Stage,所有有两个Stage
- 3.第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task
1.10 累加器(只写不读)、广播变量(只读不写)
- 广播变量:Spark的另一种共享变量是广播变量。通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。
这样理解,
一个worker中的executor,有5个task运行,假如5个task都需要这从份共享数据,就需要向5个task都传递这一份数据,那就十分浪费网络资源和内存资源了。使用了广播变量后,只需要向该worker传递一次就可以了。 - 累加器:一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。非常类似于在MR中的一个Counter计数器,主要用于统计各个程序片段被调用的次数,和整体进行比较,来对数据进行一个评估。需要注意的是,累加器的执行必须需要Action触发。
1.11 coalesce 和repartition 重新分区
- 1.coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 如果不写true/false 默认是false
- 2.repartition实际上是调用的coalesce,一定會进行shuffle的。repartition 底层也是调用coalesce的
假设RDD有N个分区,需要重新划分成M个分区:
- N < M:
一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。因为重分区前后相当于宽依赖,会发生shuffle过程,此时可以使用coalesce(shuffle=true),或者直接使用repartition()。 - 如果N > M并且N和M相差不多(假如N是1000,M是100):
那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这是前后是窄依赖关系,可以使用coalesce(shuffle=false)。 - 如果 N> M并且两者相差悬殊:
这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。