什么是spark?
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。
Spark是基于内存计算的通用大规模数据处理框架。
Spark快的原因:
1.Spark基于内存,尽可能的减少了中间结果写入磁盘和不必要的sort、shuffle(sort:顾名思义就是排序,shuffle:言简意赅就是将数据打散之后再重新聚合的过程,比如groupBy、reduceBykey等)
2.Spark对于反复用到的数据进行了缓存
3.Spark对于DAG进行了高度的优化,具体在于Spark划分了不同的stage和使用了延迟计算技术
spark基于JVM,底层语言用scala编写,java也可以,不过scala最适合,正是由于这个特性,所以在一些方法的用法上和java极度相似(语法略有不同)
spark计算引擎中的核心:SparkCore也就是核心,RDD:弹性数据分布集
在计算上有两种算子:即转换算子(Transformation )和行动算子(Action)
1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
入门之前,首先,进行环境准备,安装以及配置scala环境和spark环境,这里还要用到基于hadoop的spark环境配置,如图:
环境变量也配置一下:
运行CMD查看环境是否就绪:
scala环境已经安装完毕,此时再去看spark环境是否就绪:进入bin目录下运行cmd,输入命令:spark-shell,如下图就是启动成功(环境大致没问题了)
当所有环境准备就绪的时候,这时候,我们就可以编写一个简单的spark例子了:
eg:统计爬取的数据文件(也可以是日志文件等),输出并计算我们想要的数据:
在控制台可以打印出计算结果:如图
也可以将结果保存在本地文件中,直接保存可能会报文件夹已经存在的异常,这里我封装了一个判断目标文件(OUT)是否存在的方法,saveAsTextFile(1,path)//第一个参数是分区数,第二个为输出路径。
第二个例子。eg2:词频统计:
统计单词出现的次数。。。
通过以上两个例子,我们大致上对spark有一个初步的了解,至于一些scala语法还有spark算子以及运算方式,我会在下面附加一些常用的笔记如下:
1.var定义的属性或方法可变,val定义的不可变 ,即在 Scala 中,使用关键词 "var" 声明变量,使用关键词 "val" 声明常量。
2.scala中允许用字符串*3,代表将这个字符串输出三次
3.scala中的s函数,指字符串中的变量替换,(每个变量都以$开头)
${} 在字符串字面量中使用表达式,“${}内可嵌入任何表达式”,包括等号表达式。
println(s"name=$name,age=$age")
println(s"name=$name,age=${age+1}")
eg: var name = "zhangsan"
var age = 15
println(s"name1=$name,age1=$age")
4.例如:
var name:String=_
//在这里,name也可以声明为null,例:var name:String=null。这里的下划线和null的作用是一样的。
var age:Int=_
//在这里,age也可以声明为0,例:var age:Int=0。这里的下划线和0的作用是一样的。
导包中import com.tt._ (这里的_和java中的*是一样的,表示通配符)
5.什么是RDD?
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,他是spark中最基本的数据抽象,他代表一个不可变、可分区、里面的元素可并行计算的集合。
Spark支持两个类型(算子)操作:Transformation和Action
1.Transformation:主要做的是就是将一个已有的RDD生成另外一个RDD。Transformation具有lazy特性(延迟加载)。Transformation算子的代码不会真正被执行。只有当我们的程序里面遇到一个action算子的时候,代码才会真正的被执行。这种设计让Spark更加有效率地运行。
常用的Transformation:
2.Action:触发代码的运行,我们一段spark代码里面至少需要有一个action操作。
6.输入分区与输出分区一对一型
例:# 初始化数据
val rdd1 = sc.parallelize(Array("hello world","i love you"))
Map算子:将原来的RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。
Rdd1.map(_.split(“ ”)).collect
输出结果:res0 : Array[Array[String]] = Array(Array(hello, world), Array(i, love, you))
flatmap算子:将原来的RDD中的每个元素通过函数f转换为新的元素
flatMap = map + flatten 即先对集合中的每个元素进行map,再对map后的每个元素(map后的每个元素必须还是集合)中的每个元素进行flatten
7.getOrElse()主要就是防范措施,如果有值,那就可以得到这个值,如果没有就会得到一个默认值,个人认为早开发过程中用getOrElse()方法要比用get()方法安全得多。
getOrElse(Nil)
Map中的用法
myMap.getOrElse("key", "no key")
//当"key"在Map中不存在时,则返回"no key"
getOrElse的值必然是一个键值对的形式。
8.该函数的作用是对两个RDD结构数据进行压缩合并,将有相同key的数据合并在一起,只保留一个key对应一条数据,从而起到压缩数据的效果,对同一key下的value进行合并的方式可以指定一个计算逻辑C。
reduceByKey((x,y) => x + y)
例如:语句:
val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))
a.reduceByKey((x,y) => x + y)
输出:Array((1,5), (3,10))
解析:很明显的,List中存在两个key分别是1和3,对key为1下的value相加计算 2+3=5
对key为3下的value相加计算 4+6=10
合在一起表示为((1,5), (3,10))
9.scala中的sortBy()方法详解:
Scala中sortBy和Spark中sortBy区别
Scala中sortBy是以方法的形式存在的,并且是作用在Array或List集合排序上,并且这个sortBy默认只能升序,除非实现隐式转换或调用reverse方法才能实现降序,
Spark中sortBy是算子,作用出发RDD中数据进行排序,默认是升序可以通过该算子的第二参数来实现降序排序的方式
10.map与flatMap区别:
map与flatMap区别
Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;
而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:
操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
操作2:最后将所有对象合并为一个对象(多个元素组成的迭代器)
map
map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD编程 | 31
RDD 中对应元素的值 map是一对一的关系
11.Scala中的Map和java的相似,键值对存储。但是Spark中的map算子和他们不一样,Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象
12.
Driver:Driver是Spark中Application也即代码的发布程序,可以理解为我们编写spark代码的主程序,因此只有一个,负责对spark中SparkContext对象进行创建,其中SparkContext对象负责创建Spark中的RDD(Spark中的基本数据结构,是一种抽象的逻辑概念)
Driver的另外一个职责是将任务分配给各个Executor进行执行。任务分配的原则主要是就近原则,即数据在哪个Executor所在的机器上,则任务分发给哪个Exectuor。
简单来说就是:Driver就是new sparkcontext的那个应用程序类可以成为Driver ,而且Driver的职责是将任务分配给Exectuor执行计算
Executor:是Spark中执行任务的计算资源,可以理解为分布式的CPU,每台机器可能存在多个Executor(因为计算机的CPU有多个核),这些分布式的计算机集群会有很多的Executor,Executor主要负责Spark中的各种算子的实际计算(如map等)
13.Scala中的foreach forall exists map函数及其区别
forall:对集合中的元素进行某个判断,全部为true则返回true,反之返回false。
exists:对集合中的元素进行某个判断,其中之一符合条件则返回true,反之返回false。和forall是一个对应的关系,相当于 and 和 or。
14.函数式编程的一些方法:
遍历( foreach )
映射( map )
映射扁平化( flatmap )
过滤( filter )
是否存在( exists )
排序( sorted 、 sortBy 、 sortWith )
分组( groupBy )
聚合计算( reduce )
折叠( fold )
15.scala中:: , +:, :+ , ::: , ++ 的区别
4种操作符的区别和联系
一、:: 该方法被称为cons,意为构造,向队列的头部追加数据,创造新的列表。用法为 x::list,其中x为加入到头部的元素,无论x是列表与否,它都只将成为新生成列表的第一个元素,也就是说新生成的列表长度为list的长度+1(btw, x::list等价于list.::(x))
二、:+和+: 两者的区别在于:+方法用于在尾部追加元素,+:方法用于在头部追加元素,和::很类似,但是::可以用于pattern match ,而+:则不行. 关于+:和:+,只要记住冒号永远靠近集合类型就OK了。
三、::: 该方法只能用于连接两个List类型的集合。
四、++ 该方法用于连接两个集合,list1++list2 。
16.collect的作用
Spark内有collect方法,是Action操作里边的一个算子,这个方法可以将RDD类型的数据转化为数组,同时会从远程集群是拉取数据到driver端。
已知的弊端
首先,collect是Action里边的,根据RDD的惰性机制,真正的计算发生在RDD的Action操作。那么,一次collect就会导致一次Shuffle,而一次Shuffle调度一次stage,然而一次stage包含很多个已分解的任务碎片Task。这么一来,会导致程序运行时间大大增加,属于比较耗时的操作,即使是在local模式下也同样耗时。
其次,从环境上来讲,本机local模式下运行并无太大区别,可若放在分布式环境下运行,一次collect操作会将分布式各个节点上的数据汇聚到一个driver节点上,而这么一来,后续所执行的运算和操作就会脱离这个分布式环境而相当于单机环境下运行,这也与Spark的分布式理念不合。
最后,将大量数据汇集到一个driver节点上,并且像这样val arr = data.collect(),将数据用数组存放,占用了jvm堆内存,可想而知,是有多么轻松就会内存溢出。
如何规避
若需要遍历RDD中元素,大可不必使用collect,可以使用foreach语句;
若需要打印RDD中元素,可用take语句,返回数据集前n个元素,data.take(1000).foreach(println),这点官方文档里有说明;
若需要查看其中内容,可用saveAsTextFile方法。
总之,单机环境下使用collect问题并不大,但分布式环境下尽量规避,如有其他需要,手动编写代码实现相应功能就好。
补充:
collectPartitions:同样属于Action的一种操作,同样也会将数据汇集到Driver节点上,与collect区别并不是很大,唯一的区别是:collectPartitions产生数据类型不同于collect,collect是将所有RDD汇集到一个数组里,而collectPartitions是将各个分区内所有元素存储到一个数组里,再将这些数组汇集到driver端产生一个数组;collect产生一维数组,而collectPartitions产生二维数组。
.collect() 方法把RDD的所有元素返回给驱动,驱动将其序列化成了一个列表
.take() 返回前几个元素
17.scala中的异常处理之 try 与 Try
1.try是scala内用作异常处理的写法,最常用的写法就是 try catch finally
2.Try的机制有点类似option和Future,如果需要最终获取其中的值,需要通过.get获取,因为他们的执行都是不确定性的。
eg:
一、 try {
num.toInt
} catch {
case e: Exception => 0
}
二、Try(num.toInt).getOrElse(0)
18.Spark的Broadcast
在实际场景中,当1个function传递到1个spark operation(例如:map、reduce)时,这个function是在远程的集群node上被执行的。这些变量会被复制到每一台机器,在远程机器上不会更新这些变量,然后又传送回driver program。跨tasks共享读写变量的支持,通常是低效率的。然而,spark提供了2种通用的共享变量模式:广播变量和累加器。
Broadcast(广播)共享配置文件,map数据集,树形数据结构等,为能够更好更快速为TASK任务使用相关变量。也可以使用,当然也可以使用redis保存共享数据,让每一个task连接redis,获取共享数据
eg:首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进行广播,最后在rdd的每一个partition的迭代时,使用这个广播变量.
val values = List[Int](1,2,3)
val broadcastValues = sparkContext.broadcast(values)
rdd.mapPartitions(iter => {
broadcastValues.getValue.foreach(println)
})
广播变量
应用场景:在提交作业后,task在执行的过程中,
有一个或多个值需要在计算的过程中多次从Driver端拿取时,此时会必然会发生大量的网络IO,
这时,最好用广播变量的方式,将Driver端的变量的值事先广播到每一个Worker端,
以后再计算过程中只需要从本地拿取该值即可,避免网络IO,提高计算效率。
广播变量在广播的时候,将Driver端的变量广播到每一个每一个Worker端,一个Worker端会收到一份仅一份该变量的值
注意:广播的值必须是一个确切的值,不能广播RDD(因为RDD是一个数据的描述,没有拿到确切的值),
如果想要广播RDD对应的值,需要将该RDD对应的数据获取到Driver端然后再进行广播。
广播的数据是不可改变的。
广播变量的数据不可太大,如果太大,会在Executor占用大量的缓存,相对于计算的时候的缓存就少很多。
19.match case 模式匹配用法
eg: //match case 模式匹配
val aaa= "Scala" match{
case "Scala1" => "yes"
case "Spark" => "no"
case _=>"Default"
}
println(aaa) //输出Default
20.scala中对数字保留几位小数的方式
一、使用 浮点数字.formatted("%.2f") //保留两位小数
二、scala中有个方法:BigDecimal(dou).setScale(scale, RoundingMode.HALF_UP).toDouble
下面是封装的方法,int代表保留几位,dou表示需要处理的浮点数
eg:val test1=mScale(4,2.13333)
println("***************=>>>"+test1)
=========================================================================
以上可以作为初学者的参考,欢迎补充交流。