Spark基础

Spark基础

一.spark介绍

1.1 spark特点

  • 1.快速高效 :spark可以将中间结果cache到内存中,节省了网络IO和磁盘IO。同时spark使用了dag任务调度思想,将计算逻辑构成了一个有向无环图,同时也会将dag优化后再生成物理计划,所以性能比mapreduce好很多。
  • 2.简洁易用,spark支持scala,java,python,r等语言操作。
  • 3.提供了统一的大数据解决方案。Spark还支持SQL,大大降低了大数据开发者的使用门槛,同时提供了SparkStream和Structed Streaming可以处理实时流数据;MLlib机器学习库,提供机器学习相关的统计、分类、回归等领域的多种算法实现。其高度封装的API
    接口大大降低了用户的学习成本;Spark
    GraghX提供分布式图计算处理能力;PySpark支持Python编写Spark程序;SparkR支持R语言编写Spark程序。
  • 4.支持多种部署方案,1.standalone,spark on yarn, spark on mesos
  • 5.支持多种数据源,hdfs,hbase,hive,alluxio以及任何和hadoop相兼容的。

1.2 spark跟mapreduce优缺点对比

  • 1.mr:mr只能做离线计算,如果实现复杂逻辑,一个mr解决不了,需要将多个mr按照顺序串联计算,然后每一个mr的结果存储在hdfs中,写一个mr将上一个mr的输出结果作为输入,这样就要频繁读写hdfs,网络io和磁盘io是性能瓶颈,从而效能低下。
  • 2.spark:既可以做离线计算,也能做实时计算。提供了抽象的数据集(RDD,Dataset,dataframe,DStream),有高度封装的API,算子丰富,同时使用了更先进的DAG有向无环图,可以优化后再执行执行计划,并且数据可以cache再内粗中,
  • 3.mr的task是以map task, reduce这样的进程级别实例组成,spark的worker,executor也是进程级别实例,但是分配到具体任务的时候,mr还是进程实例,但是spark处理任务的单位task是运行在executor的线程,是多线程级别。

1.3 3.0版本新特性

1.改进spark sql,在运行时对查询计划优化,允许spark planner在允许时执行这些计划,这些计划将在运行时执行可选的计划。计划将基于运行时统计数据优化,提高性能
● 动态合并shuffle partitions
● 动态调整join策略
● 动态优化倾斜的join
● 动态分区裁剪
● ANSI SQL兼容性
● Join hints

1.4

基本概念:

  • 1.application,表示你的应用程序
  • 2.driver,表示main()函数,创建spark context。由spark context 负责和cluster manager通信,进行资源的调度。程序执行完毕关闭spark context.
  • 3.executor,某个application 运行在worker节点的一个进程,该进程负责运行某些task,负责将数据存在磁盘或者内存中。负责将task包装成task
    runner,并从线程池抽一个空闲线程运行task,每一个executor最多能运行的task数量取决于CPU的核数。
  • 4.work, 集群中运行application的节点。
  • 5.task,每个executor进程中执行任务的工作单元,多个task组成stage。
  • 6.stage,又叫task set,由多个task组成
  • 7.job,包括多个task set组成的并行计算,是又action行为触发,
  • 8.dag scheduler。基于job构建基于stage的dag,并提交stage给task scheduler。靠rdd之间的依赖关系划分stage.
  • 9.task scheduler。将stage提交给worker节点运行,每个executor执行什么task就是在这里来。

1.5 spark流程

Spark基础

  • 1.构建spark application环境,启动spark context。 spark context 向资源管理器(standalone或者yarn)注册于申请executor资源。
  • 2.资源管理器分配executor资源,同时启动standaloneexecutorbackend。executor会将运行状态汇报给资源管理器。
  • 3.executor香spark context申请task。
  • 4.spark context将程序代码构建dag有向无环图。然后dag scheduler将dag分解为stage,分配stage给task scheduler。
  • 5.executor向task scheduler申请 task。
  • 6.task scheduler分配task给executor,同时sparkcontext将应用程序代码发给executor。
  • 7.Task在Executor上运行,运行完毕释放所有资源。

二、RDD的使用

1.1 什么是RDD

rdd是一个弹性的分布式数据集,是spark里面最基础的抽象。RDD是一个不可变的、有多分区的、可以并行计算的集合。rdd不装真正要计算的数据,只存放着数据的描述信息。例如从哪里读数据,调用了什么方法,传了什么函数,以及依赖关系等等。

1.2 rdd的特点

  • 1.rdd之间存在依赖关系。RDD是不可改变的,RDD只能通过transformation生成一个新的rdd,子rdd会记录父RDD的一些依赖关系,包括宽依赖和窄依赖
  • 2.rdd存在分区:分区标号一般从0开始,分区的数量决定了对应阶段的task并行度。
  • 3.函数作用在每个分区上,每个分区都生成一个task,对该分区进行计算,整个函数就是具体的计算逻辑。
  • 4.kvrdd在shuffle的时候会有分区器,默认使用hashpartitioner。
  • 5.如果是从hdfs取数据,rdd会从namenode获取数据地址,移动计算,并不是移动数据,可以提高计算效率。

宽依赖和窄依赖
父RDD中的partition和子RDD的partition。如果关系是一对多(父rdd的一个partition数据对应子rdd的多个partition数据),就是宽依赖;父RDD中的partition和子RDD的partition如果是一对一或多对一,就是窄依赖

简单来说,就是,父RDD一个ID分区有1和2两个ID,子RDD只获取了父RDD的1或者2,这就是宽依赖,如果子RDD将1和2都获取了,就是窄依赖

1.3 rdd分类

  • 1.transformation 转换算子,调用转换算子生成新的rdd。transformation 是lazy惰性计算,不触发job执行
  • 2.action 行动算子,调用行动算子会触发job执行,本质上是调用了sc.runjob()方法。该方法会从最后一个rdd,根据依赖关系,从后往前,划分stage,生成taskset。

1.4 创建rdd方式

  • 1.利用parallelize转换集合为rdd val rdd1 = sc.parallelize(Array(1,2,3,4))
  • 2.从指定目录读取文件 val line = sc.textFile("hdfs://192.x.x.x:9000/log")

1.5 常用transformation算子

  • 1.map 单元素映射 val rdd1 = sc.parallelize(Array(1,2,3,4)).map(_*2)
  • 2.flatmap() 先map映射,再压平 val rdd_fm = sc.parallelize(Array(“a b c”,“d e f”)).flatmap(_.split(’ ')).collect();
  • 3.filter 过滤 val rdd_f = sc.parallelize(Array(1,2,3,4)).filter(_%2==0)
  • 4.mapPartitions() 效果跟map一样,但是map是针对单个元素,mapPartitions是针对整个分区的。例如一个分区要处理1w条数据,那么map的fun()就要计算1w次,但是mapPartitions是把1w条数据接收了,再计算1次就行。
   val rdd_mp = sc.parallelize(Array(1,2,3,4)).mapPartitions(it =>
   it.map(x => x*10))
  • 5.mapPartitionsWithIndex,类似于mapPartitions, 不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。●
    mapPartitionsWithIndex,类似于mapPartitions,
    不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。
  • 6.sortBy算子,排序
    sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x,true)
    sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x+"",true)
    sc.parallelize(List(5,11,22,13,2,1,10)).sortBy(x=>x.toString,true)
    7.sortByKey算子,排序
    val rdd1 = sc.parallelize(List((“hello”, 9), (“tom”, 8), (“kitty”, 7), (“tom”, 2)))val rdd2 = rdd1.sortByKey()
  • 8.groupBy算子,按照key分组
    val rdd1 = sc.parallelize(List((“hello”, 9), (“tom”, 8), (“kitty”, 7), (“tom”, 2)))val rdd2 = rdd1.groupBy(_._1)
  • 9.groupByKey 按照key进行 val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd2: RDD[(String,
    Iterable[Int])] = rdd1.groupByKey()
  • 10.reduceByKey val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd3 = rdd1.reduceByKey(+)
  • 11.distinct算子,去重 sc.parallelize(List(5,5,6,6,7,8,8,8)).distinct.collect
  • 12.union算子,并集,两个RDD类型要一样 val rdd6 = sc.parallelize(List(5,6,4,7), 2)val rdd7 = sc.parallelize(List(1,2,3,4), 3)val rdd8 =
    rdd6.union(rdd7)

1.6 常见action 算子

  • 1.collect,将数据以数组形式收集回Driver端,数据按照分区编号有序返回。同时会从远程集群是拉取数据到driver端。 val rdd1 = sc.parallelize(List(1,2,3,4,5), 2) rdd1.collect
  • 2.reduce,将数据以输入的函数进行聚合返回一个值

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
val r = rdd1.reduce(+)

  • 3.count,返回rdd元素的数量

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
var c = rdd1.count

  • 4.top将RDD中数据按照降序或者指定的排序规则,返回前n个元素

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
var c: Array[Int] = rdd1.top(2)

  • 5.take,返回一个由数据集的前n个元素组成的数组

val rdd1 = sc.parallelize(List(3,2,4,1,5), 2)
var c: Array[Int] = rdd1.take(2)

  • 6.saveAsTextFile以文本的形式保存到文件系统中

val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1.saveAsTextFile(“hdfs://node-1.51doit.cn:9000/out2”)

1.7 缓存算子 cache、persis

cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,可以将数据缓存到内存或磁盘【executor所在的磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,可以指定更加丰富的存储基本,支持多种StageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,也可以使用其他的序列化方式
cache和persist方法,严格来说,不是Transformation,应为没有生成新的RDD,只是标记当前rdd要cache或persist。

1.8 checkpoint算子 下载中间结果到hdfs

checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,触发多次Action,checkpoint才有意义,多用于迭代计算

1.9 rdd分析

sc.textFile(args(0))
.flatMap(.split(" "))
.map((
, 1))
.reduceByKey(+)
.saveAsTextFile(args(1))

  • 1.假如读取hdfs中的目录有两个输入切片,最原始的HadoopRDD的分区为2,以后没有改变RDD的分区数量,RDD的分区都是RDD,所以有两个rdd
  • 2.在调用reduceByKey方法时,有shuffle产生,要划分Stage,所有有两个Stage
  • 3.第一个Stage的并行度为2,所以有2个Task,并且为ShuffleMapTask。第二个Stage的并行度也为2,所以也有2个Task,并且为ResultTask,所以一共有4个Task

1.10 累加器(只写不读)、广播变量(只读不写)

  • 广播变量:Spark的另一种共享变量是广播变量。通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。
    这样理解,
    一个worker中的executor,有5个task运行,假如5个task都需要这从份共享数据,就需要向5个task都传递这一份数据,那就十分浪费网络资源和内存资源了。使用了广播变量后,只需要向该worker传递一次就可以了。
  • 累加器:一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值
    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。非常类似于在MR中的一个Counter计数器,主要用于统计各个程序片段被调用的次数,和整体进行比较,来对数据进行一个评估。需要注意的是,累加器的执行必须需要Action触发。

1.11 coalesce 和repartition 重新分区

  • 1.coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 如果不写true/false 默认是false
  • 2.repartition实际上是调用的coalesce,一定會进行shuffle的。repartition 底层也是调用coalesce的

假设RDD有N个分区,需要重新划分成M个分区:

  • N < M:
    一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。因为重分区前后相当于宽依赖,会发生shuffle过程,此时可以使用coalesce(shuffle=true),或者直接使用repartition()。
  • 如果N > M并且N和M相差不多(假如N是1000,M是100):
    那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这是前后是窄依赖关系,可以使用coalesce(shuffle=false)。
  • 如果 N> M并且两者相差悬殊:
    这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。
上一篇:Spark阶段总结


下一篇:01- jsp注释