文章目录
- Spark学习笔记
- 第一章、基本认识与快速上手
- 第二章、环境搭建
- 第三章、Spark运行架构
- 第四章、Spark核心编程
- 4.1、RDD
- 4.2、RDD基础编程
- 4.2.1、RDD创建
- 4.2.2、分区与并行度
- 4.2.3、RDD算子
- 4.2.4、转换算子
- 4.2.4.1、mapPartition
- 4.2.4.2、mapPartitionsWithIndex
- 4.2.4.3、glom
- 4.2.4.4、groupBy
- 4.2.4.5、filter
- 4.2.4.6、sample
- 4.2.4.7、distinct
- 4.2.4.8、coalesce
- 4.2.4.9、sortBy
- 4.2.4.10、交集、并集、差集、Zip
- 4.2.4.11、partitionBy
- 4.2.4.12、reduceByKey
- 4.2.4.13、groupByKey
- 4.2.4.14、aggregateByKey
- 4.2.4.15、foldByKey
- 4.2.4.16、CombineByKey
- 4.2.4.17、Join
- 4.2.4.18、Left(Right)OuterJoin
- 4.2.4.19、cogroup
- 4.2.5、行动算子
- 4.2.6、算子小结
- 4.2.7、闭包检查和序列化
- 4.2.8、闭包检查(二)
- 4.2.8、闭包检查(二)
- 4.2.9、Kryo序列化框架
Spark学习笔记
ohhhhh~从今天开始就要开始学习大数据中的杀手级计算引擎——Spark。了解大数据的不可能没有听说个Spark吧!听这个名字就感觉很炫酷哦!!火花!下面正式进入学习吧!
第一章、基本认识与快速上手
Spark官网:http://spark.apache.org/
1.1、认识Spark
我们在学习Hadoop的时候,在网上就能看到很多Spark和Hadoop的比较视频,并且明显可以看出Spark深受互联网大厂的青睐,不少互联网大数据公司都开始使用Spark作为其主流计算引擎,那么Spark究竟为什么俘获企业的喜爱呢?还是从它官网的那句slogan说起!
Lightning-fast unified analytics engine(闪电般的统一分析引擎!)
回想一下我们学习Hadoop时,MR任务的执行慢到宁人抓狂!那么Spark则是抓住了Hadoop这个弱点狠狠下了一顿功夫进行了优化!所以Spark常用的定义就是:
基于内存的快速、通用、可扩展的大数据分析计算引擎
一看到基于内存,我们就知道Spark快在哪里了!
1.2、对比Hadoop
Hadoop:
Hadoop起源于Apache Nutch项目,始于2002年,是Apache Lucene的子项目之一 。2004年,Google在“操作系统设计与实现”(Operating System Design and Implementation,OSDI)会议上公开发表了题为MapReduce:Simplified Data Processing on Large Clusters(Mapreduce:简化大规模集群上的数据处理)的论文之后,受到启发的Doug Cutting等人开始尝试实现MapReduce计算框架,并将它与NDFS(Nutch Distributed File System)结合,用以支持Nutch引擎的主要算法 。由于NDFS和MapReduce在Nutch引擎中有着良好的应用,所以它们于2006年2月被分离出来,成为一套完整而独立的软件,并被命名为Hadoop。到了2008年年初,hadoop已成为Apache的*项目,包含众多子项目,被应用到包括Yahoo在内的很多互联网公司
内容来源:百度百科
Spark:
Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
内容来源:百度百科
两者如今都是Apache软件基金会的*项目!但是也不免有人会将他们放在一起比较,甚至他们的之间也会使用数据进行相互比较。相爱相杀~
通过百度百科对Spark的描述,我们就已经能够清晰了解到Spark究竟快在哪里。我们现在再来复习一下Hadoop的Job执行流程,用画图的方式来学习Spark优化之处!
这是一个Job的执行的大致流程:
中间有两次磁盘的IO操作,前者是Map的Shuffle结果序列化到磁盘中),后者是将一个Job的计算结果存入了磁盘!这两次磁盘IO操作就会大大降低整个Job的效率!并且复杂的MR也会带来一定的时间开销!并且将Job的结果存入磁盘,就注定了在多计算任务复杂需要串联多个Job时,就会无故增加IO操作的次数!这也就是为什么MapReduce不适合做迭代式计算的原因!
所以我们称MapReduce是基于磁盘的计算引擎!
而Spark针对上述一系列进行了重新设计和优化,优化了MapReduce的操作使用RDD计算模型,并且可以将Shuffle的结果保存在内存中,Job的计算结果也保存在内存中,这样的话读写速度大大提升,也更方便迭代式计算!所以Spark的场景定位就不同于Hadoop!
我们称Spark是基于内存的计算引擎!
Spark比Hadoop快的主要原因有:
消除了冗余的HDFS读写
Hadoop每次shuffle操作后,必须写到磁盘,而Spark在shuffle后不一定落盘,可以cache到内存中,以便迭代时使用。如果操作复杂,很多的shufle操作,那么Hadoop的读写IO时间会大大增加。消除了冗余的MapReduce阶段
Hadoop的shuffle操作一定连着完整的MapReduce操作,冗余繁琐。而Spark基于RDD提供了丰富的算子操作,且reduce操作产生shuffle数据,可以缓存在内存中。JVM的优化
Spark Task的启动时间快。Spark采用fork线程的方式,Spark每次MapReduce操作是基于线程的,只在启动。而Hadoop采用创建新的进程的方式,启动一个Task便会启动一次JVM。
Spark的Executor是启动一次JVM,内存的Task操作是在线程池内线程复用的。
每次启动JVM的时间可能就需要几秒甚至十几秒,那么当Task多了,这个时间Hadoop不知道比Spark慢了多少。
但是要说明一下的是:**Spark并不是MapReduce的高阶替代品,他和MapReduce的关系更像是互补!**因为Spark也有使用的局限:由于Spark是基于内存的,所以对内存资源要求严格,当资源不足时可能会导致计算任务无法完成,而此时对机器性能要求不那么高的MapReduce就是不二选择!
1.3、Spark组成基本介绍
spark的官网有这样一幅图:
很明显标注出了Spark的五个模块,以及他们之间的关系!
- Apache Spark(Spark Core):这是我们将要学习的Spark,他是Spark的核心部分!其余四个模块都是基于SparkCore进行扩展!
- SparkSQL:用于操作结构化数据!
- SparkStreaming:主要用于实时计算!
- SparkMLlib:主要用于machine learning(机器学习)!
- SparkGraphx:主要用于图像的计算处理
1.4、快速上手之WorldCount实现
前置准备
-
IDEA(集成开发环境,IDE)
-
创建maven工程,导入Spark3.0依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> </dependencies>
这里注意,使用的Scala版本应该是Scala-2.12!!版本要对应哦!
-
本地需要配置Hadoop,以及winutil.exe相关文件(否则程序启动会报ERROR,但并不影响计算结果!)
使用Spark的基本步骤!
- 创建配置类
SparkConf
,可以使用各种set方法,对配置进行修改。 - 创建与Spark连接
SparkContext
- 业务操作。。(略)
- 使用SparkContext的
stop()
方法关闭与Spark的连接!
object WordCountDemo {
def main(args: Array[String]): Unit = {
// 1. 建立连接
// 配置设置
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
// 连接
val context = new SparkContext(sparkConf)
// 2. 业务操作
// 3. 关闭连接
context.stop()
}
}
下面wordcount的计算处理过程就放在步骤3中来完成!
简要回顾WordCount的问题处理过程(复习MapReduce手动实现WordCount)
- 读取文件(逐行读取)
- 将行数据按照设定的分隔符拆分成一个个单词
- 将单词设为Key,进行分组排序
- 每组统计单词出现次数即Value
- 结果输出
数据:
words1.txt=>
Hello Spark
Hello Scala
Hello World
words2.txt=>
Hello Scala
Hello Spark
Spark Hello
1.4.1、方式一(Scala类似集合操作实现)
完整代码:
object WordCountDemo {
def main(args: Array[String]): Unit = {
// 1. 建立连接
// 配置设置
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
// 连接
val context = new SparkContext(sparkConf)
// 2. 业务操作
// 1)读取文件,获取到一行行的数据
val lines = context.textFile("datas/*")
// 2) 行数据扁平化,获取到一个个单词
val words = lines.flatMap(_.split(" "))
// 3) 将单词分类放置,得到类似(hello, hello, hello)(scala, scala)
val wordGroups = words.groupBy(word => word)
// 4) 改变形式,得到(hello, 4)的样子
val result = wordGroups.collect {
case (word, list) => (word, list.size)
}
// 5) 结果输出到控制台
result.foreach(println)
// 3. 关闭连接
context.stop()
}
}
结果:
(Hello,6)
(World,1)
(Scala,2)
(Spark,3)
里面有一个groupBy
是我在学习Scala过程中没有涉及到的,所以我将其相关的使用点补充到Scala的学习笔记中:11.2.9、groupBy!
上面这种写法虽然实现了目标功能,但是会感觉我们并没有做MR中Reduce端类似的聚合操作,而是取巧使用了Scala集合中size这个属性获取次数!
而正确的执行过程应该是若干个<word, 1>
进行聚合得到<word, n>
1.4.2、方式二(MR思维实现)
利用MR思维,使用聚合方式的话,我们就不能用size方法了,并且扁平化后得到的单词也要封装成Tuple2(word, 1)
。分组后,使用reduce()
聚合,得到最终的结果(word, n)
object WordCountDemo2 {
def main(args: Array[String]): Unit = {
// 1. 建立连接
// 配置设置
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
// 连接
val context = new SparkContext(sparkConf)
// 2. 业务操作
// 1)读取文件,获取到一行行的数据
val lines = context.textFile("datas/*")
// 2) 行数据扁平化,获取到一个个单词
val words = lines.flatMap(_.split(" "))
// 将单词封装成(word, 1)
val wordTuples = words.map((_,1))
// 3) 将单词分类放置,得到类似 hello => ((hello, 1), (hello, 1), (hello, 1))
val wordGroups = wordTuples.groupBy(_._1)
// 4) 聚合,得到(hello, 4)的样子
val result = wordGroups.collect {
case (word, wordList) => wordList.reduce{
(word1, word2) => (word1._1, word1._2 + word2._2)
}
}
// 5) 结果输出到控制台
result.foreach(println)
// 3. 关闭连接
context.stop()
}
}
我们增加了一个包装word的步骤,并修改了后面求结果的方式!我们用一张图来对比一下!
可以好好看看代码在最后聚合时是如何计算出最后的结果的!
但是这样写依旧不满意!这里面貌似除了这些集合是RDD相关的,并且其使用的方法和Scala中集合的方法没什么不同,我们还没有看到Spark的影子!我们希望利用Spark的方式实现!那么就要用到特殊方法啦!
1.4.3、方式三(Spark实现)
object WordCountDemo3 {
def main(args: Array[String]): Unit = {
// 1. 建立连接
// 配置设置
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
// 连接
val context = new SparkContext(sparkConf)
// 2. 业务操作
// 1)读取文件,获取到一行行的数据
val lines = context.textFile("datas/*")
// 2) 行数据扁平化,获取到一个个单词
val words = lines.flatMap(_.split(" "))
// 将单词封装成(word, 1)
val wordTuples = words.map((_,1))
// 使用Spark reduceByKey()方法
val result = wordTuples.reduceByKey(_+_)
// 5) 结果输出到控制台
result.foreach(println)
// 3. 关闭连接
context.stop()
}
}
一个reduceByKey()
直接解决了分组和聚合!官方的源码注释是说**利用这个函数,对相同Key的数值进行一次Merge 在mapper发送数据到reducer之前!**说明这里已经用到了MapReduce来执行任务,而这个函数的作用只是类似于我们学习Hadoop-MapReduce时所说的Mapper阶段的combine!在Mapper阶段结果进行一次合并!
reduceByKey()
所处理的是具有相同key的键值对的value部分!所以我们使用(_+_)
就表示value累加。
可以看出来,使用了Spark的特有方法无论是对代码的简洁性,还是执行的效率都有一定的提升!
题外话:Spark日志问题
使用默认的Log4j的配置,会输出很多无关的INFO日志,我们只需要看ERROR日志就行了,所以我使用一下配置就行了(在resource目录下创建log4j.properties粘贴一下内容即可)
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
第二章、环境搭建
上面我们都是使用的IDE,在集成开发环境下我们可以开发并运行环境,但是工作中我们的Spark程序一定是放服务器集群中运行的!在国内工作中主流的开发环境是Yarn,但是现在容器式环境逐渐流行!
除此以外还有Local(本地模式)、Standalone(独立模式)
前置准备
- 虚拟机
- Spark安装包
2.1、Local模式
本地模式我们也可以理解为单机模式,没有集群的概念,不需要其他任何节点资源就可以在本地运行Spark的代码!(区别于IDEA中的local,idea中只是为项目创建了开发环境,并没有为机器创建。。)
搭建步骤
-
解压tgz包,到指定文件夹下,并进入spark目录
-
bin/spark-shell
启动spark的命令行模式一些关键点,图中已经用红框标了出来!
-
我们开发中所用的
SparkContext
在命令行中已经为我们预先准备好了:即sc
-
给了一个SparkContext浏览的web页面,可以通过
4040
端口访问!就是一个Job的监控页面
-
scala>
命令行,表明里面可以直接写scala的代码,也就是说我们可以直接在命令行里面完成Spark程序并运行!
使用jps命令可以查看到
spark-shell
是一个SparkSubmit进程,也就是我们可以使用这种方式向本地Spark提交并运行我们的Spark程序(即Job) -
2.1.1、SparkShell命令行执行
使用命令行运行一次Spark程序(以WC为例):
-
在spark的data目录下创建我们的数据文件:words.txt
-
在命令行中编写程序代码:
2.1.2、spark-sublime提交任务
命令行方式在实际开发中并不会使用,正确的姿势应该是**使用集成开发环境完成Spark程序的开发和调试,然后将程序打包,然后扔到服务器上提交执行!**下面我们利用Spark的示例Jar包来测试一下程序的提交。
使用spark-submit
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
运行结果:
这是Spark官方提供的案例程序,暂且不关心其正确性,但是确实能够正确运行!
2.1.3、提交任务的参数说明
bin/spark-sublimt \
--class xxx.xxx.xx \
--master xx \
[ApplicationJar] \
[ApplicationParam] \
[Others]
参数 | 说明 |
---|---|
--class |
Spark程序主函数所在类 |
--master |
Spark程序的运行模式(环境)【local】、【spark://xxxx:7077】、【Yarn】 |
--executor-memory 1G |
每个executor可用的内存大小1G |
--total-executor-cores n |
所有的executor共可用的CPU核心数量n |
--executor-cores n |
每个executor可以用的CPU核心数n |
[ApplicationJar] |
Spark程序所在的Jar包 |
[ApplicationParam] |
Spark程序运行时主函数的参数 |
以上是本地模式的搭建和使用,较适合学习和教学。
2.2、Standalone模式
此模式又称为独立部署模式,即不需要其他额外环境(自己提供计算资源),但是有鲜明的主从关系,这里需要用到三台虚拟机,他们分别担任的角色是:
Host | 角色 |
---|---|
hadoop102 | Master、Worker |
hadoop103 | Worker |
hadoop104 | Worker |
从Local模式转换到Standalone模式需要修改一些配置文件!
2.2.1、配置改动与启动
conf/slaves
- 和以往一样,将template文件后缀去掉
- 文件原始内容是localhost,我们需要改为我们三台机器的host名(注意/etc/hosts文件中的映射)
conf/spark-evn.sh
-
第一步相同操作
-
添加
export JAVA_HOME=xxx
配置本机的Jdk环境 -
设置集群master节点
SPARK_MASTER_HOST=hadoop102 # 7077是Spark中默认的Master的通信接口 SPARK_MASTER_PORT=7077
这里参考自己集群的配置。。
修改结束后,将整个Spark安装文件分发给其他两台机器!(不知道分发的,去看Hadoop的笔记)
启动与检查
在master节点下执行:sbin/start-all.sh
,群起。
使用jps查看进程:
然后访问master_host1:8080,(hadoop102:8080)页面可以看到当前集群的状态!
测试提交任务
此时提交任务所用的环境不再是local,而是spark://master_host1:7077
所以要改写成:--master spark://hadoop102:7077
没有问题!正常运行!
使用sbin/stop-all.sh
关闭Spark集群!
2.2.2、历史服务开启
之前我们使用spark-shell时,我们访问的job监控页面,当shell退出后就无法访问了再次启动就是全新的,但是我们希望能随时对历史Job的执行进行查看,那么就需要配置历史服务器(Hadoop中也做过类似的配置!)
启动历史服务的话,就涉及到历史记录日志的存放,所以这里还需要用到Hadoop集群,利用HDFS存放日志文件!
Spark配置修改
conf/spark-default.conf
-
还是去掉template后缀
-
增加以下配置:
spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:9000/directory
注意spark.eventLog.dir对应的是HDFS中的路径,记不清HDFS端口号的,查看hadoop安装目录下etc/hadoop/core-site.xml文件中NameNode配置
下面的我hdfs的配置:
<!--指定HDFS中NameNode的地址--> <property> <name>fs.defaultFS</name> <value>hdfs://hadoop102:9000</value> </property>
并且要保证配置中这个hdfs目录是已创建的!
所以要启动一下Hadoop集群(启动HDFS应该就够了)!检查一下!
conf/spark-env.sh
-
加入以下内容:
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:9000/directory -Dspark.history.retainedApplications=30"
解释以下这三条配置:
- 历史服务器的web页面访问端口:8080
- 历史服务的日志文件放置的位置:hdfs://hadoop102:9000/directory
- 历史服务中保留历史Spark程序的数量:30
启动检查
修改结束后,分发以下conf目录
然后确保HDFS服务器已经启动!然后启动Spark集群,最后启动历史服务:
sbin/start-history-server.sh
任意执行一个Spark程序,然后通过hadoop102:18080,访问web页面检查历史服务是否生效
sbin/stop-history-server.sh
关闭历史服务!
2.2.3、配置高可用
目前我们的Spark集群,只有一个master节点,就会出现单点故障(即一个节点出错,会导致整个系统不可用)的问题!所以最简单的办法就是再增设一个master节点作为后备!
Host | 角色 |
---|---|
hadoop102 | Master、Worker |
hadoop103 | Worker、Master[备用] |
hadoop104 | Worker |
而这个配置,需要zookeeper参与(通常高可用的配置都离不开zookeeper!)!
配置修改
spark.env.sh
-
将我们之间单节点master的配置注释掉,然后加入以下内容
# 单节点Master配置注释掉 #SPARK_MASTER_HOST=hadoop102 #SPARK_MASTER_PORT=7077 # 加入以下内容,这里集群的web页面端口不使用默认8080,担心会被zookeeper占用!改使用8989 # zookeeper.url配置集群中所有节点的host SPARK_MASTER_WEBUI_PORT=8989 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 -Dspark.deploy.zookeeper.dir=/spark"
启动检查
分发conf/目录!!
在启动前,先启动zookeeper集群!!
然后启动Spark集群,这样我们只是按常规启动了Spark集群,我们的备用的master(hadoop103中)依旧没有启动,我们需要到对应的机器下,手动单独启动master进程!sbin/start-master.sh
为确保正确性,可以使用zkCli.sh连接zookeeper集群,
看看/spark这个node有没有被创建!/spark/leader_election下应该有两个node才对!分别对应两个master节点
此时我们访问hadoop102:8989、hadoop103:8989都是没有问题的。
测试提交任务
此时--master
的参数又要变化了:应改写为
--master spark://hadoop102:7077,hadoop103:7077
没有问题!
这里我犯傻遇到一个问题,我们之前设置了将日志写到hdfs中,所以如果不启动hdfs集群的话会导致Job执行失败!!(启动hdfs集群后,也不要急着启动Job,因为safemode下是不能文件创建的,很可能也会失败!等待safemode关闭后再提交Job即可!)
模拟master宕机,测试高可用
使用kill -9 xx
关闭正在正常运行的master进程,看看是否备用的master是否会被选举上位。
这个选举上位的过程需要耐心等待~ 最终实测可用!
2.3、Yarn模式
Yarn模式在工作中是最常用的部署模式!在开始介绍Spark的时候就说明了它是一个计算引擎!虽然他可以不依赖第三方框架通过独立部署就能完成整个计算资源调度。但是终归是一个计算框架,在资源调度方面并不如专业框架!所以还是那句话:**专业的事情让专业的人去做!**Hadoop的Yarn就是资源调度方面的高手!
那么此时我们将利用Spark只是将其作为计算引擎,替代MapReduce。这样一来Hadoop、Spark混合部署,各司其职、各尽其用!
搭建步骤
Hadoop配置文件:yarn-site.xml
加入以下配置:
<!-- 关闭任务物理内存使用检查,默认(true)开启 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!-- 关闭任务虚拟内存使用检查,默认(true)开启 -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
说过Spark在计算时占用的内存资源很多,所以为了避免Yarn误杀了计算任务,我们关闭了物理内存和虚拟内存的检查!
spark-env.sh
现在我们只要Spark的计算功能,之前那些什么Worker、Master都是用于资源调度的,现在我们都可以不要了!并且只需要安装在一台机器上就行了,我们需要的是Spark的一些基本配置,以及它计算相关的jar包!
将Standalone的配置内容注释掉,保留历史服务和Java环境的配置,然后加入以下配置:
# 配置Yarn的配置目录,以自己机器为准
YARN_CONF_DIR=/opt/modules/hadoop-2.7.7/etc/hadoop
启动测试
这次我们不分发,也不用启动Spark,如果需要我们仅需启动Spark的历史服务!
在此之前要保证Hadoop集群完全启动(HDFS、Yarn都启动!)
因为现在我们是混合部署,我们的重心是Hadoop,Spark此时扮演的是MapReduce的替代者!!
Spark提交任务
你懂的,--master
又变了,这次变成了--master yarn
同时我们需要附加参数:设置部署模式–-deploy-mode [cluster/client]
(cluster和client二选一)
当你选择cluster部署模式(集群部署模式):结果不会在控制台输出!但是选择client(客户端模式),就会看到结果再控制台输出。
既然使用了Yarn,那么在yarn的监控页面:(由于我集群上ResourceManager是在hadoop103上的) hadoop103:8088就能看到所有任务的执行情况!
这就回到了我们熟悉的界面~
如果开启了Spark的历史服务,在Spark的历史服务的页面hadoop102:18080也能看到历史程序的运行。
2.4、三种模式对比
模式 | Spark安装机器数量 | 需启动进程 | 所属者 | 应用场景 |
---|---|---|---|---|
Local | 1 | NONE | Spark | 测试、学习 |
Standalone | 3 | Master、Worker | Spark | 单独部署 |
Yarn | 1 | Yarn、HDFS | Hadoop | 混合部署 |
这三种模式中,Yarn部署模式算是物尽其用!充分发挥了Spark的计算性能以及Yarn的资源调度功能!
第三章、Spark运行架构
3.1、核心架构初认识
3.1.1、计算核心组件
作为一个以计算为核心的框架,那么它一定有完善的用于计算的核心架构!
先看下面这张图:
很清楚就能看到两个部分:
- Driver
- Executor
这就是Spark计算的两个核心组件!Spark是标准的主从架构。那么这两者的关系很显然Driver是主、Executor是从!
下面我们来简单认识一下他们分别在计算任务中发挥什么作用吧!
Driver
驱动?!见名知意,他是整个计算任务的动力来源!他会驱动控制整个任务顺利执行,他的主要工作有:
- 将Spark程序转化为计算任务,分发给Executor执行
- 在Executor之间调度任务(Task)
- 跟踪监控Executor的任务执行情况
- 用过UI展示任务的执行情况
Driver其实并不参与任何计算步骤!他只是整个任务的“启动者”以及“监工”。
主要是将程序转化为任务,然后发给Executor(打工人)去完成,协调集群完整计算任务
至于他是如何将程序转化为任务的,我们后面再说~
Executor
执行器,对,我就是一个老老实实的打工人!你给我们任务我完成就行了。所以它的工作非常简单:
- 执行Driver分配的任务,并返回结果
- 缓存部分需要缓存的数据,以提高计算的效率!
Executor才是计算的实施者,他们会完成Driver发给他们的Task,并将结果反馈给Driver。虽然是可怜的打工人,但是他们的地位不可小看!
3.1.2、资源调度核心组件
我们在使用standalone模式部署时,我们使用Spark自带的资源调度组件Worker&Master。他们之间的关系也是普通的主从关系,我们甚至可以把他们俩拿来和Yarn的两个组件类比!
Worker相当于NodeManager,Master则相当于ResourceManager。
Worker管理着单个计算节点的资源,而Master则是掌管着整个集群的资源!
我们的计算任务总会涉及到资源的请求,而这个请求首先是由Driver发起(要准备要一个计算任务,就得算好资源的利用),但是如果让申请到Master那里,计算框架的组件和资源调度框架的组件这样直接交互,就会造成耦合!所以为了降低耦合,只好在他们之间加上一层:集群管理者图中的(Cluster Manager),然后由集群管理者向集群的资源管理者也就是Master“转述”这个资源请求,说了这么久的资源,其实说白了就是executor。请求成功后,会按照要求在Worker节点上启动若干个executor,准备执行Driver发送的任务!
以上是利用Worker和Master完成的,如果换上Yarn,那么资源的请求与分配就要按照Yarn那一套来了。(ResourceManager -> NodeManager -> ApplicationMaster -> Job)
3.2、架构相关知识了解
简单认识了Spark中重要的两部分架构后,还有一部分细节和相关内容我们需要了解。这里我们暂时只做了解,并不深入其中
3.2.1、Executor和Core
上面提到了资源就是executor,这句话其实并不严谨!在每个Worker节点上确实可以有若干个executor,(提交Spark程序时使用--num-executors n
指定executor的数量)
Executor是运行在Worker节点上的一个JVM进程,专门用于执行计算任务,需要内存、CPU资源。
所以我们可以近似认为executor就是资源!
至于每个executor的占用资源如何设置,我们在2.1.3中就说明了,可以返回去看一看哦!
但是得说明一下:里面所说的CPU核心数量,其实是虚拟CPU核心数!(因为这个设置是不受限于机器的配置的!)当资源充足时,他可以实现并行计算,若资源不够则是使用并发来模拟并行!
下面我们就来说一说这个并行与并发!
3.2.2、并行度(Parallelism)
并行度:同时执行的任务(Task)数量!
在Worker中我们的executor被分配到的CPU核心,并非代表我们机器的CPU核心。而是相当于给他开启了若干个线程,这些线程要同时去抢占机器的CPU的核心,这也就是我们常说的并发!
而并行,则是资源充足情况下(CPU多核),每个虚拟CPU(线程)都能拿到一个物理CPU核心,核心之间并行执行!
所以说合理设置资源数量是可以提高计算的并行度的!!
并行和并发
参考博客:并行与并发的区别https://blog.csdn.net/java_zero2one/article/details/51477791
并行是物理上多个核心处理多个线程各做各的事情!
而并发则是一个核心,通过快速切换执行不同线程上的任务,以达到宏观上的并行效果,其实微观上还是串行执行!
用参考博客中的例子生动说明:
并行就是两把铁锹,两个人,各挖各坑!
并发是一把铁锹,两个人,你挖一下然后他挖一下!(仿佛两个人一起在挖坑)
可想而知前者效率更高!
所以在工作中Spark的性能调优,资源的分配是很重要的!保证资源最大化利用不浪费,并提高计算的并行度!
3.2.3、有向无环图(DAG)
这个东西听着好像和计算没有半毛钱关系,其实这个是Spark计算任务执行流程的重要指标!
由于Spark是分布式的并行计算,一个计算任务会被分解成多个小任务放到不同的计算节点上完成,那么如果这些任务之间有相互依赖的关系,就需要用有向无环图来确定他们的执行顺序!
他主要是解决循环依赖 的问题,在Spring,Maven中应该都听过这个词,循环依赖会导致程序崩溃!我们需要避免循环依赖的出现!计算任务也是一样,利用DAG先确定下计算任务的执行顺序,然后按照顺序执行任务即可!
3.2.4、提交流程
提交Job的流程,主要分为两个方面:程序转为任务的准备,资源申请;
看这张图,SparkContext即Driver一边在进行Task的准备(黄色框),一边还在申请资源!(蓝色框)但是两者之间是由先后顺序的!
- 先申请资源,申请到资源后,Executor启动并反馈情况,做好准备接受Task!
- 然后Driver会将程序转为Job,并划分为Task发放给申请到的Executor。然后在Worker节点上Executor执行完Task,将结果返回,并同时回资源!
这是大致的任务提交流程,我们使用Yarn作为资源调度框架时,说了两种部署模式:
- Cluster
- Client
那么这两种部署模式的区别是什么呢?!**Driver程序运行节点的位置!**用两张图来看一下吧:
Cluster:在集群中!(SparkContext在Yarn集群中)
Client:在集群外!
至于他们之间更深层次区别,我们后面再说吧!
第四章、Spark核心编程
为了满足Spark分布式计算需要,封装了三大数据结构:
- RDD:弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
在这之前我们先简单了解一下什么是分布式计算。
首先很明确的一点:计算任务由Driver发出,由Worker节点上的Executor接收执行。
分布式计算,也就是我们有2个及以上Worker节点来共同完成Driver发出的任务,那么他们之间就必须得有任务的划分,以及结果的合并。为了支持这种分布式的计算,上面三种数据结构随之诞生!
4.1、RDD
4.1.1、什么是RDD?
弹性分布式数据集(Resilient Distributed Datasets)。初次接触可能会感觉难懂,我们稍后将其拆分开来说。他在整个计算中担任的是数据处理模型的角色。是最小的计算单元!
计算单元:即数据+复杂计算逻辑。而复杂的计算逻辑可以拆分成多个简单的计算逻辑;所以最小计算单元就是数据+最简单的计算逻辑。
在Spark中可以将一个数据处理程序看做为一个的计算单元。这个计算单元我们需要拆分成若干个最小计算单元发送给Executor执行。
总结起来就是一句话**“一个庞大的计算任务,我们可以使用若干RDD进行关联形成复杂的计算逻辑,以完成计算。”**
4.1.2、RDD运行原理
是不是已经崩溃了?!说的都是些什么啊?!没关系现在我们从我们较为熟悉的IO开始讲起:
简单回顾IO流
先看这张图:
不知道你还记不记得java.io中的节点流
和处理流
!
节点流:是我们使用IO进行数据传输的核心所在,利用节点流我们可以来两点之间使用字节/字节流的方式传输数据。(例如FileReader、FileInputStream、等图中粗体)
处理流:它是建立在节点流的,相当于在节点流上套了一层。(每个处理流的创建,其构造函数中都需要一个节点流对象作为参数。)(例如BufferedReader、BufferedInputStream)
既然节点流已经可以完成IO流传输数据的工作,那么处理流的存在意义何在呢?!
往往只是使用低级的节点流,会有性能和效率的问题。例如没有缓冲区,就只能读完一个字节后,必须写出后才能读下一个,严重影响性能。
同时低级的节点流不具备处理复杂情况和特殊数据的能力。
那么处理流的存在的意义就体现出来了:
- 提高节点流的性能,提高读写效率。(例如BufferedInputStream,增加了缓冲区)
- 丰富、完善节点流的功能,通过套上各种处理流,使得封装后的流可以处理各种情况、数据,并且利用处理流的方法,简化读写操作!(例如readLine())
我们来看一个最简单的IO流代码:
package JavaIO_02;
import java.io.*;
/**
* 文件流
* DataOutputStream DataInputStream
* 1.写出后读取
* 2.读取顺序应与写出保持一致
*
* @author gyc
* @Data 2019/8/23
*/
public class DataStream {
public static void main(String[] args) throws IOException {
String Path = "Data.txt";
//DataOutputStream写出
DataOutputStream dos = new DataOutputStream(
new BufferedOutputStream(
new FileOutputStream(Path)));
dos.writeUTF("官宇辰");
dos.writeInt(817);
dos.writeBoolean(true);
dos.writeChar('g');
dos.flush();
//DataInputStream读取
DataInputStream dis = new DataInputStream(
new BufferedInputStream(
new FileInputStream(Path)));
//按写出的顺序读取
String msg = dis.readUTF();
int num = dis.readInt();
boolean flag = dis.readBoolean();
char ch = dis.readChar();
System.out.println(msg + num + flag + ch);
}
}
案例中FileOut(In)putStream是字节流,也就是我们所说的“低级流”
Buffered(In)OutputStream和Data(In)OutputSteam都是处理流。使用这两个处理流将字节流进行层层“包装”逐渐丰富了字节流。
- Buffered(In)OutputStream:提供了缓冲区,提高读写性能。
- Data(In)OutputStream:提供了丰富,便捷易用的数据读写方法。
装饰模式
这种一层套一层,对功能进行拓展的方式,在设计模式中有一个专属名称:装饰模式
在《大话设计模式》中是这样引出装饰模式的:
- 我们需要把所需的功能按正确的顺序串联起来进行控制
- 装饰模式(Decorrator),动态地给一个对象添加一些额外的职责。
装饰模式中还有一个特别重要的是:**将装饰的创建和使用分离!**每个装饰只关心其自己的核心功能!
那么我们再回头来看看IO流,里面的处理流就有点像“装饰类”,而节点流就像“被装饰类”!在进行装饰的时候并没有数据流通,而是在具体使用方法时,才开始真正读写数据工作!
为了更加形象表达装饰的使用和创建的分离,我们用一个经典的案例来模拟一下装饰模式(详细见GOF23学习笔记)!
总结IO、装饰模式并对比学习RDD
无论是在IO中,还是装饰模式中,都有些共同点!
- 大多都有共同的基类
- 每个处理流、装饰类都有自己的功能
- 流的链接或者说装饰的过程都是有顺序的
装饰模式的实施过程我们可以用一张图来描述:
这个图,就告诉了我们如何通过装饰模式将多个小的类按照需要组装成目标类!对基类进行功能拓展!
这?!听起来是不是有点思路了,我们再看回RDD的开始的介绍!所谓最小计算单元,其实就可以看作是装饰类中的一个小小的装饰组件!当我们把若干个最小计算单元进行组合是不是就能组成我们目标的复杂计算任务?!
我通过源码来看一下:
我们在WordCount的代码中,我们使用textFile()
读取文件后,我们就拿到一个RDD类型的对象
之后我们便是使用Spark中的flatMap()
、map()
等方法,起初开始我还傻傻的以为他们只是Scala中的函数,原来是经过改良了!他们的函数值无一例外都是RDD类型!!
还有reduceByKey()
也是,感兴趣可以去翻一翻,最终返回的是一个ShuffleRDD
!
除了返回值都是RDD类外,你会发现这些返回的RDD其构造函数中都有一个参数是RDD类型。并且使用的时候,都是传入函数调用者即图中的this
!并且这些RDD类都有一个共同基类RDD
!这是不是就有点像IO流呢?大体上完全符合装饰模式!
我们用一张图来还原一下:
然后我们回想一下在装饰模式中说的:装饰的创建和调用分离!
证明我们在使用flatMap,map,reduceByKey
并没有数据进来!我们只是在搭建一个数据处理模型,这个数据模型则是用于处理我们手中的数据!这就是装饰的创建!
可是当我们使用collect()
开始采集数据时,数据进入处理模型即这一层又一层包装的RDD,这才是装饰的调用!
这里面每一层的计算逻辑都使用RDD封装,数据在之间流动!(可以学习一下Java中流编程)说的简单点就是:计算逻辑是我们的加工工厂,我们按加工流程提前布置好各种加工器械,数据是原料,原料放入后经过各种器械流水加工后得到产品即结果!
所以这样一来,再复杂的计算任务都能够拆分成这样一层一层的来完成!
4.1.3、RDD认识
再复习一下它的名字:弹性分布式计算集
弹性(可变,自动转变)
- 存储的弹性:存储位置再内存和磁盘之间自动切换(内存并非无限)
- 容错的弹性:计算过程中数据丢失,自动恢复
- 计算的弹性:计算错误自动重新计算
- 分片的弹性:根据计算节点的资源变化自动重新分片
分布式
一个计算任务由数据加若干RDD关联组成,需要将数据分区发到集群中的各个节点上!
数据集
并不保存数据!封装了计算逻辑!
不可变(可能与弹性有些矛盾,要正确理解)
指的是封装的计算逻辑是不能修改的!想要修改就只能重新创建!
可分区、并行计算
由于是分布式的,放在不同的节点上运行,要对数据进行分区!
并且分区后发送到不同节点的数据是并行计算的!
4.1.4、RDD五大基本属性
直接上硬货!在RDD的源码注释中,已经将RDD的五大主要基本属性列了出来:
-
分区列表:既然是分布式并行计算,那么分区必然少不了,此属性是一个数据分区的列表!
-
分区计算函数:计算计算!总归要对数据进行操作,这个函数就是计算逻辑的描述!
-
对其他RDD的依赖列表:多个RDD之间,是有计算的先后关系的(回想DAG)。所以我们需要存下RDD之间的依赖关系!
-
*(可选的)*分区器:分区器用于定义数据分区的规则,但不是必须的!
/** Optionally overridden by subclasses to specify how they are partitioned. */ @transient val partitioner: Option[Partitioner] = None
-
*(可选的)*最优位置列表:我们在进行任务准备时,就已经知道了数据所在位置,所以每个Task是清楚的知道它所计算的数据存放在哪个结点上!但是如果当Task和数据不再一个结点上时,就出现了两种解决方式:
- 移动Task
- 移动数据
相比较而言,在数据量巨大的时候后者所要消耗的额外资源明显大于前者,所以就有了移动数据不如移动计算的原则!所以我们需要保证任务调度时尽可能将任务发送到其计算数据所在的节点上!而这个优先位置列表就是任务的最佳计算位置!
4.1.5、执行原理(Yarn模式下)
执行原理主要分为以下几个步骤:
- 申请资源
- 解析Spark程序得到计算任务
- 计算节点执行任务,按处理模型对数据计算
- 得到计算结果
由于是部署在Yarn上,所以我们使用的是ResourceManager
和NodeManager
进行统一资源管理!他们分别扮演的是Master和Worker的角色!
而我们的Spark作为计算引擎,需要启动自己的计算相关的进程:Driver
以及Executor
!
这四者的关系用一张图表示就是这样:
Driver将多个关联的RDD进行串联,最终得到我们的Task,然后将Task放入一个TaskPool(任务池)中,然后Executor到任务池中拉取对应的任务进行执行!
4.2、RDD基础编程
了解完RDD的作用,我们来看一下RDD在程序中如何使用!下面进入RDD的基础编程:
4.2.1、RDD创建
创建RDD共有四种方式:
- 从内存(程序集合)中创建
- 从文件中创建
- 从外部创建
- 使用new关键字创建
后两者主要在Spark源码中利用,而前面两者则是我们学习的创建RDD的关键!
从内存中创建RDD
从内存即从程序中的集合创建一个RDD!使用SparkContext
的parallelize(Seq seq)
方法!方法的Sequence参数即我们要创建RDD的集合对象!parallelize(平行化)
object CreateRDD {
def main(args: Array[String]): Unit = {
// 环境准备
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
// parallelize方法,返回一个RDD对象
val result = sc.parallelize(List(1, 3, 5, 7, 9))
result.collect().foreach(println)
/**
* 1
* 3
* 5
* 7
* 9
*/
// 关闭Spark连接
sc.stop()
}
}
这里我偶然发现了一个问题,我现在还不是很明白为什么:当我对result不使用collect()
方法时,直接遍历RDD进行输出,结果每次都不一样。但是我使用collect()方法后将RDD转为Array后输出,内容就会保持不变?!这是为啥?!
使用parallelize
方法,我们并无法很好理解平行化和RDD之间的关系,Spark为我们提供了一个简单明了的RDD创建方式:makeRDD()
,这样就好记多了!
val result = sc.makeRDD(List(2, 4, 6, 8, 10))
其实 makeRDD()
的底层还是调用了parallelize()
:使用makeRDD只是方便我们记忆~~
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices) // 调用parallelize()
}
从文件中创建
这个应该很熟悉了,使用textFile()
方法:
object FileCreateRDD {
def main(args: Array[String]): Unit = {
// 环境准备
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
// 使用textFile创建RDD
val result = sc.textFile("datas")
result.collect().foreach(println)
// 关闭Spark连接
sc.stop()
}
}
textFile读取文件是逐行读取的,至于参数中路径的的写法有很多种,案例中这种写法表示读取文件夹下所有的文件!也支持使用通配符,正则表达式!
还介绍另外一种:wholeTextFiles()
// 使用wholeTextFiles
val result2 = sc.wholeTextFiles("datas")
这种方法读取文件是以文件为单位读取,结果是一个二元组Tuple2[String, String]
!
分别存放文件的路径,以及文件内容!多个文件得到的结果就是一个二元组的列表!Array[(String, String)]
4.2.2、分区与并行度
4.2.2.1、makeRDD分区数量
RDD中是要进行分区的,而并行度则是取决于分区!但是分区了并不一定是并行!(得看你的Executor和机器)
在学习使用makeRDD
创建RDD的时候,你一定注意到了它除了一个Seq参数,还有一个名为numSlices
的Int参数,而这个参数就是决定RDD的分区数量!
为了方便验证,我们可以使用saveAsTextFile
将RDD以文件的形式输出,一个分区对应一个文件。
诶!果然不错!那我们如果不写是不进行分区吗?!
然而结果并不是这样:
那么我们到源码中去看一番吧:看看这个默认值到底是多少!
在makeRDD的源码中我们确实能看到numSlices是有一个默认值的:defaultParallelism
然后你会发现这是TaskScheduler
这个Trait的一个抽象方法,而这个特质有一个实现类:
TaskSchedulerImpl
,这个实现类中对此抽象方法的实现则是调用SchedulerBackend
特质中的抽象方法,而这个Backend特质有一个实现类:LocalSchedulerBackend
符合我们本地开发模式,其中对此抽象方法的实现是:
scheduler.conf.getInt("spark.default.parallelism", totalCores)
而这里的conf则正是我们进行Spark连接时所创建SparkConf!
而我们并没有设置spark.default.parallellism
这个配置项,那么则会使用totalCores
作为这个默认值!
我们来设置一下这个配置项,看是否会对我们的分区结果产生影响:
符合我们的预期!
也就是说我们的totalCores,默认情况下是12?!其实他还受到一个因素的影响!只是我们一直没有提到:
就是我们sparkConf.setMaster("local[*]")
这里的local[*]
表示模拟我们机器的所有核心执行程序,而本机的CPU核心为12核心,那么totalCores理所当然就是12啦!可是当你写的是local
时,则表示使用单线程来模拟单核心来执行程序,那么totalCores就是1啦!
写local[2]、local[18]、local[n]
totalCores就是n,因为是模拟的,所以n没有上限!
4.2.2.2、makeRDD分区规则
了解完分区数量的设定,再来看看数据是按照什么分区的吧:
1,2,3,4,5,6,7,8,9,0 十个数,让你分四个区,是不是会有很多种分法,来看一下我们在使用默认分区器时,分区规则是什么吧。
默认分区器的分区结果是:
- 分区一:1,2
- 分区二:3,4,5
- 分区三:6,7
- 分区四:8,9,0
我们直接到源码里面去找到分区规则:
-
进入makeRDD源码,然后查看parallelize源码
-
parallelize中返回了一个
ParallelCollectionRDD
对象def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() // 返回ParallelCollection对象实例 new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }
-
进入此类,查看其
getPartitions
方法override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray }
利用了
slice
方法,对数据进行了分区。我们现在来看看他这个静态方法: -
slice方法源码:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { if (numSlices < 1) { throw new IllegalArgumentException("Positive number of partitions required") } def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } seq match { case r: Range => // code case nr: NumericRange[_] => // code case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq }.toSeq } }
里面定义了一个方法
positions
, 但是主要部分是对序列进行模式匹配,由于前两个case并不是我们这里的重点,所以将代码省略。。(感兴趣的可以看看)默认匹配情况下:
- 先将集合转化为数组
- 然后将数组长度(即集合元素数量)以及分区数量作为positions函数的参数
- 通过positions函数,获取确定每个分区的数据数组的索引范围
- 然后根据分好的索引范围对数据进行分片
所以我们可以手动来还原一下分区的过程:
-
数据转为数组:[1, 2, 3, 4, 5, 6, 7, 8, 9, 0],arrayLength = 10, numslices = 4
-
positions函数:
输入=> 10, 4 得到的分区数据索引范围: 0 => ((0 * 10) / 4,(1 * 10)/4) => (0, 2) 1 => ((1 * 10) / 4,(2 * 10)/4) => (2, 5) 2 => ((2 * 10) / 4,(3 * 10)/4) => (5, 7) 3 => ((3 * 10) / 4,(4 * 10)/4) => (7, 10)
-
对照索引的范围使用slice方法对数据进行切分
注意集合中的slice()
方法的切片,参数是from和until,即是左闭右开区间最终结果就是:
- 分区一:[0,2) => 1,2 - 分区二:[2,5) => 3,4,5 - 分区三:[5,7) => 6,7 - 分区四:[7,10) => 8,9,0
诶嘿,和我们最初始的结果一模一样!
4.2.2.3、文件数据分区
上面都是集合的数据分区,相比较而言,文件数据的分区要更为复杂:
textFile()
同样也有一个分区相关的参数:minPartitions
说明这个分区数量并无法由我们自己掌控,但是我们可以给出最小分区数!
这个参数的默认值defaultMinPartitions
在底层是这样:
// defaultParallelism和makeRDD相同!
math.min(defaultParallelism, 2)
看完分区数量的设置,我们来看重点:数据如何分区!?
要知道文件中有些内容是不能随意拆分的!(例如:WordCount里你要是把一个单词拆成了俩,就会出问题!)
我们先来看一个案例:
数据:
1234567
89
0
minPartitions=3; 结果是
- 分区一:123456
- 分区二:789
- 分区三:空
- 分区四:0
我们进入到textFile的源码中去一探究竟吧
-
首先看到的是熟悉的东西:
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path)
TextInputFormat、LongWritable、Text,浓浓的Hadoop气息!
-
再往里面走,就能看到HadoopRDD了
-
还是老样子,看他的
getPartitions
方法 -
捕捉到重点的一行:
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
getSplits应该就是获取切片了
-
然后我们查看getSplits的源码(FileInputFormat实现类的)
注意有个
totalSize
记录着所有要读取文件的字节大小!说到这里不禁要提一嘴!小心行末的
\r\n
(CRLF)或者\r
(LF)也是要算字节的哦!果不其然,我这数据文件并不是简简单单的10字节而是14个字节!
然后利用totalSize以及传入的minPartitions(即代码中的numSplits)计算出goalSize:
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
而后面这个minSize呢:
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // SPLIT_MINSIZE需要配置此项: public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
由于我们没有配置这个SPLIT_MINSIZE,所以get到的就是默认值1,而minSplitSize也等于1,那么最终minSize也就等于1
然后就是我们在HadoopMR中学过的内容了:(复习MapReduce中Job切片的源码一节)
long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize);
computeSplitSize里面的计算公式:还强调为重点:只是maxSize变为了goalSize:
Math.max(minSize, Math.min(goalSize, blockSize));
最终确定了切片的大小就是goalSize!!
然后就是按照字节数对内容进行切分了,就没什么好说的了~
但是在文件读取和切片过程中要记住:**是以偏移量为单位进行读取切片的!**并且为了避免数据的重复,不会重复读取偏移量!但是读取方式是逐行读取,也就是说如果一行中有超过分区大小的数据,虽然你只要一部分,但是实际上你已经读取了一整行!这一行已经不会再读第二次了!所以为了保证数据不丢失,你只能将这一整行数据塞到一个分区中!(至于前面的结果里面为啥中间有一个空分区,我也不太明白~)
4.2.3、RDD算子
上面我们已经学会了如何创建出RDD,下面我们就要学习RDD中的一些方法了
例如map、flatMap、collect等,由于他们和Scala中的集合操作的方法实在是太相似了,所以为了区分,我们称RDD的方法为算子!所谓算子可以简单的认为是一个操作(Operate)!
RDD的算子主要分为两大类:
-
转换算子:主要是将一种RDD转化为另一种RDD,完善和增强功能(类似于任务的封装过程)
- map、flatMap等
-
行为算子:触发任务、作业的执行。
- collect等
4.2.4、转换算子
转换算子按照计算方式又分为:
- 单值转换算子
- 双Value转换算子
- Key-Value转换算子
我们比较熟悉的map
,flatMap
就不多说具体的使用了。
4.2.4.1、mapPartition
在学习这个算子之前,我们先回想一下map()
,我们通过一个案例来看一下map算子的执行过程:
object MapPartitionsTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
val mapRDD = rdd.map {
number => {
println(">>>>>>>>>>" + number)
number
}
}
val mapRDD1 = mapRDD.map {
number => {
println("++++++++++" + number)
number
}
}
mapRDD1.collect()
// 关闭Spark连接
sc.stop()
}
}
两个RDD进行串联,都是使用map算子!只有一个分区,猜猜看结果是如何?
可见,**分区内数据的串行执行的计算的!**也就是说必须等上一个数据完全完成了计算才能开始下一个数据!可是当我们将其分为两个区后:
可以看出分区内数据依然是串行计算,但是分区之间是并行计算的!
mapPartitions
:是以分区为单位进行数据处理的!
object MapPartitionsDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val mapPartitionsRDD = rdd.mapPartitions(
iter => {
iter.map(
number => {
println(">>>>>>>>>" + number)
number
}
)
}
)
val mapPartitionsRDD1 = mapPartitionsRDD.mapPartitions(
iter => {
iter.map(
number => {
println("+++++++++" + number)
number
}
)
}
)
mapPartitionsRDD1.collect()
// 关闭Spark连接
sc.stop()
}
}
结果和前面使用map的结果一样!
mapPartitions唯一的不同是:处理的单位是一个分区的数据,所以参数函数的输入是一个迭代器,输出还是一个迭代器!
那么它相对于map有哪些优点呢?
在分区数据间进行计算得出结果的活,map无法胜任!(例如:取出每个分区中的最大值?)
map在处理数据的时候,压根就不管你是哪个分区的!但是mapPartitions处理的数据是收到分区制约的,所以很容易实现,分区数据间的计算!
// 找出每个分区中最大值
object MapPartitionsTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val mapPartitionsRDD = rdd.mapPartitions(
iter => {
// 包装成List,并返回迭代器
List(iter.max).iterator
}
)
mapPartitionsRDD.collect().foreach(println)
/**
* 3
* 6
*/
// 关闭Spark连接
sc.stop()
}
}
好虽好,但是也有缺陷,mapPartitions存在对分区的引用,若分区数据较大,同时加载到内存中可能导致内存被大大消耗,最终导致作业失败!
4.2.4.2、mapPartitionsWithIndex
当我们需要对不同的分区的数据做不同的操作时,我们必须要清楚数据所在分区,而mapPartitions
并不能给出分区的信息,但是他的升级版:mapPartitionsWithIndex
就可以做到!
object MapPartitionsWithIndexDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val resultRdd = rdd.mapPartitionsWithIndex(
(idx, iter) => {
idx match {
case 0 => iter.map(_ * 2)
case _ => iter.map(_ * 3)
}
}
)
resultRdd.collect().foreach(println)
/**
* 2
* 4
* 6
* 12
* 15
* 18
*/
// 关闭Spark连接
sc.stop()
}
}
可以看出,参数函数的输入的值不再是单一的迭代器,而是一个Int、一个迭代器!这个Int参数,即表示分区编号(从0开始!)
所以我们可以使用模式匹配,来判断分区,以达到对不同分区做不同操作的目的!
4.2.4.3、glom
glom
算子:主要作用是将分区的数据转化为内存数组!(便于使用Array中的方法~)但是分区不可变(即转化后数据来自同一个分区,数量与分区数据一致!)
object GlomDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
// 返回RDD[Array[Int]]
val glomRDD = rdd.glom()
glomRDD.collect().foreach(array => println(array.mkString(",")))
// 关闭Spark连接
sc.stop()
}
}
将分区数据转化为Array后,我们就可以利用Array中的方法简化问题的解决过程了!
4.2.4.4、groupBy
这个groupBy的使用在前面的WordCount的入门案例中使用过,具体使用方法和Scala中的集合方法也是一样的,这里主要说一个注意点!
前面的几个算子,都不会导致分区的数据混乱(即一个分区的数据经过算子操作后会到其他分区中!)
但是groupBy是对数据进行分组,那么就很有可能两个不同区的数据会被分到一个group!那么就有点像MR中的Shuffle过程,数据大洗牌!不管来自哪个分区,经过分组后全都混在一起!
object GroupByDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("Hello", "World", "Spark", "Scala", "Hi"),2)
// 以单词的手写字母分组
val groupRdd = rdd.groupBy(
word => {
word.charAt(0)
}
)
groupRdd.collect().foreach(
wordGroup => {
println(wordGroup._1 + "=>" + wordGroup._2)
}
)
/**
* H=>CompactBuffer(Hello, Hi)
* S=>CompactBuffer(Spark, Scala)
* W=>CompactBuffer(World)
*/
// 关闭Spark连接
sc.stop()
}
}
4.2.4.5、filter
作用就不用多说了,来说一说它的问题吧:
在大数据情况下:一旦过滤的条件选择不当,可能导致严重的数据倾斜!(即过滤后一个分区的数据极多,而另一个分区数据所剩无几~)
谨慎使用!!
4.2.4.6、sample
sample
算子用于对分区数据进行抽样!方法有三个参数:
-
withReplacement:Boolean
:是否将抽取的数据放回(true:放回,底层执行泊松分布算法;false:不放回,底层执行伯努利实验算法)
-
fraction:Double
- 当参数一为false时,fraction表示每个数据被抽中的概率。但是由于是伪随机,所以可以当做是基准值。!取值在[0,1]区间内
- 当参数一为true时,fraction表示每个数据被期望抽到的次数,(实际结果可能大于也可能小于)。取值范围 >=0
-
seed:Long = Utils.random.nextLong
前面说了这个抽样是一个伪随机的,所以要利用随机数进行模拟,那么就需要一个种子,不写的话使用默认值是一个伪随机数!
object SampleDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),1)
println("=========不放回===========")
rdd.sample(
false,
0.28
).collect().foreach(println)
/**
* =========不放回===========
* 3
* 4
* 5
* 6
* 8
*/
println("=========放回===========")
rdd.sample(
true,
1
).collect().foreach(println)
/**
* =========放回===========
* 1
* 1
* 1
* 1
* 3
* 3
* 6
* 7
* 8
* 9
*/
// 关闭Spark连接
sc.stop()
}
}
这个抽样的算子,在开发中到底有什么用呢?!
前面我们说到过某些操作可能会导致严重的数据倾斜,那么我们就可以利用抽样算子,提前对数据进行抽样分析,并提前对分区数据进行均衡!以避免后续的数据倾斜!
4.2.4.7、distinct
用于分区的数据去重!(包括分区内、分区间!)
object DistinctDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 3, 3, 4, 3, 4, 5, 6), 2)
rdd.distinct().collect().foreach(print)
/**
* 46135
*/
// 关闭Spark连接
sc.stop()
}
}
distinct
有一个参数:numPartitions:即去重后数据分区数量,不填则使用去重前数据分区数!
其实RDD中的distinc算子和集合中的distinct方法在底层原理上还是有很大的区别的!
集合的distinct方法:
源码中使用了HashSet!(Set中不允许重复的数据!)
RDD中distinct算子:
相对来说要复杂一点,可是仔细一看发现和WordCount的过程差别不大。所以理解起来也是比较容易的!将元素数据变为KV类型,数据为Key,Value为null(随便,无关紧要)。然后按照Key进行分组聚合到这一步就已经达到了去重的目的了!
4.2.4.8、coalesce
这个算子有两个用法:缩减分区 / 扩充分区
coalesce
算子的参数:
- numPartitions:Int => 缩放后的分区数量
- shuffle:Boolean => 缩放分区是否进行shuffle
缩减分区
为什么要缩减分区?
当数据经过过滤后,得到小数据集,如果单独使用一个Executor来计算小数据集的话。感觉有点杀鸡用牛刀!于是我们就想到将小数据集的分区合并成一个大的分区!
object CoalesceDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
// 三个小数据集分区【1,2】【3,4】【5,6】
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),3)
// 缩减成为两个分区【1,2】【3,4,5,6】
rdd.coalesce(2).saveAsTextFile("output")
// 关闭Spark连接
sc.stop()
}
}
可以看到,当我们不设置第二个参数时默认使用false,即不使用shuffle。不会改变分区(即同一个分区的数据还是在一个分区中!)这就可能导致一个严重的问题:数据倾斜!。(案例中我们想要的缩减后分区是:【1,2,3】和【4,5,6】,可是没有进行shuffle,所以3和4必须在一个分区中!)
开启shuffle后:
// 得到两个平衡的分区【2,3,6】【1,4,5】
rdd.coalesce(2, true).saveAsTextFile("output")
扩充分区
当需要扩充分区时,参数一的值必须大于原分区数!并且一定一定一定要开启shuffle!!否则毫无作用!不会进行分区扩充!(什么原因不用我多说了吧!)
// 两个分区
val rdd2 = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 2)
// 扩充为4个分区【2,8】【3,5】【4,6】【1,7】
rdd2.coalesce(4, true).saveAsTextFile("output2")
4.2.4.9、sortBy
前面学了groupBy是按照规则分组,这里则是按照规则排序
参数:
-
f:T=>K
=> 一个定义规则的函数,参数为元素,返回值是用于排序的Key -
ascending:Boolean
=>是否升序?! -
numPartitions:Int
=>排序后分区数,默认使用排序前分区数量
object SortByDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
// 三个小数据集分区【1,2】【3,4】【5,6】
val rdd = sc.makeRDD(List(41, 27, 32, 74, 15, 6),3)
val result = rdd.sortBy(
number => number,
true
).collect().mkString(",")
println(result)
/**
* 6,15,27,32,41,74
*/
// 关闭Spark连接
sc.stop()
}
}
4.2.4.10、交集、并集、差集、Zip
这里都是双值的转换算子!
交集
intersection
val rdd = sc.makeRDD(List(1, 2, 3, 4, 7))
val rdd2 = sc.makeRDD(List(4, 5, 6, 7, 8))
// 交集 intersection [4,7]
println(rdd.intersection(rdd2).collect().mkString(","))
并集
union
// 并集 union [1,2,3,4,7,4,5,6,7,8]
println(rdd.union(rdd2).collect().mkString(","))
差集
subtract
// 差集 subtract [1,2,3]
println(rdd.subtract(rdd2).collect().mkString(","))
拉链
zip
// 拉链 zip [(1,4),(2,5),(3,6),(4,7),(7,8)]
println(rdd.zip(rdd2).collect().mkString(","))
注意事项:
-
现在我们测试的数据是同类型的!当rdd中数据类型不相同时,交集、并集、差集都是做不了的(编译会报错类型不匹配!)但是不影响zip!!
交并补算子的调用rdd的数据类型、参数的rdd数据类型以及返回值rdd返回值类型都是一样的!
-
zip的使用注意:
Can’t zip RDDs with unequal numbers of partitions,
Can only zip RDDs with same number of elements in each partition
当RDDs的分区数量不一致的时候不能使用zip!分区的的数据要一致才能使用zip!(在集合中数据长度不一致是可以zip的!但是这里考虑到数据丢失的问题,所以不允许!)
要使用Zip要保证两个RDD的数据分区和数据长度要一致!!
4.2.4.11、partitionBy
这里就是KV类型的转换算子了!
当你试图用一个RDD[T]对象来调用partitionBy的时候,惊奇地发现没有这个方法?!
是我搞错了?!版本错了?!并不是!!因为partitionBy是处理K-V类型的数据的,所以我们改变一下数据:
val rdd = sc.makeRDD(List(1, 2, 3, 4, 7, 9))
val mapRDD = rdd.map((_, 1))
这样一来就可以使用partitionBy了!当你点过去准备看为什么这么邪乎的时候,你发现这个玩意根本就不是一个RDD的方法!!
而是PariRDDFunctions这个类的的方法,并且这个类的主构造函数的参数只有一个RDD[(K,V)]
,并且你还看到了一个熟悉的方法reduceByKey,以及我们接下来要学习的groupByKey!
一个RDD竟然可以调用非RDD类的方法,不由得想到了**隐式转换!**果不其然,在RDD的伴生对象中发现了这样一个隐式转换函数:
这样说来前面那个类的主构造函数也就不奇怪了~ 同样的隐式转换还有很多!!
弄明白怎么回事,我们来看看partitionBy的用法:
这个算子主要是用于对数据进行重分区!!
参数需要一个分区器!可以使用自带的分区器(e.g. HashPartitioner
)HashPartitioner的参数表示分区的数量!
object PartitionByDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 7, 9))
val mapRDD = rdd.map((_, 1))
mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")
// 关闭Spark连接
sc.stop()
}
}
对HashPartitioner的分区规则感兴趣的可以看一看源码~~
当RDD的当前分区器和partitionBy使用分区器相同,并且分区相同时,还会进行分区吗?
点开partitionBy的源码你就会发现他会对当前rdd的分区器和参数传入的partitioner进行一次比较,如果比较相同则直接返回当前的RDD(即已有分区,跳过多余的分区操作)
可是这个分区器的判断所使用==
最终都会使用分区器的equals
方法,我们来看看HashPartitioner的equals方法:
可以看到对于HashPartitioner来说,首先你要类型匹配,其次分区数相同,就认定两者匹配成功!至于其他的分区器,感兴趣的可以看看!
还有其他分区器吗?如何自定义自己的分区器?
由于PythonPartitioners是私有的,只有指定包下可用。所以可以供我们使用的就只有
HashPartitioner
RangePartitioner
如果要自定义一个分区器,需要继承partitioner
这个抽象类,并实现里面的两个方法:
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
后面我们会详细介绍
4.2.4.12、reduceByKey
这个算子,我们在入门学习WordCount案例时就用过了!它主要的功能是按Key分组、聚合Value!
还是简单复习一下:
object ReduceByKeyDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 1, 2, 3, 4, 5, 4, 6, 7, 6))
val mapRDD = rdd.map((_, 1))
mapRDD.reduceByKey(_ + _).collect().foreach(println)
// 关闭Spark连接
sc.stop()
}
}
reduceByKey中的参数中,**只需要定义聚合函数(即Value的聚合方式!)**由于处理的数据是KV类型的,所以会自动按照Key进行分组!然后同组内对Value按照给出的聚合函数进行聚合!
注意:
**并不是所有的分组都会进行聚合!**聚合的函数进行这样改写后
mapRDD.reduceByKey {
(v1,v2) => {
println("一次聚合操作~")
v1 + v2
}
}.collect().foreach(println)
结果是:只有三次输出(即聚合函数只调用了三次!!)
说明:那些只有一个键值对的分组不会调用聚合函数!!
4.2.4.13、groupByKey
很明显和上面的ReduceByKey相比,这个算子只是用于对数据进行分组!返回值也是一个二元组!
-
_1
是此分组的Key! -
_2
是此分组内所有的KV中的Value组成的一个集合!
object GroupByKeyDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
1->"a", 2->"b", 3->"c", 2->"d", 1->"e", 4->"f", 3->"g"
)
)
rdd.groupByKey().collect().foreach(println)
/**
* (1,CompactBuffer(a, e))
* (2,CompactBuffer(b, d))
* (3,CompactBuffer(c, g))
* (4,CompactBuffer(f))
*/
// 关闭Spark连接
sc.stop()
}
}
唠一唠reduceByKey和GroupByKey一些区别:
先来说一下他们的共通点:都会按照Key进行分组!但是有一个问题一直存在疑问?!我们的数据明明是分区的!也就是说每个区数据计算是分开的?那么如何做到将不同区的数据一起分组呢?!
图示:
例如图中分组的数据是来自两个分区的,但是如何判断是否每个分区都完成了?如果图中过程1已经完成了但是过程3很慢,那么group1的后续操作就一直等着吗?等多久呢?
这样一直等着,内存就会一直被占用,那么多个作业都这样的话,就很容易出现内存超载!
其实这个过程涉及到了==分区数据的shuffle!==而为了避免带来内存压力,shuffle操作转到了文件中进行,即将结果写入磁盘中,我们称落盘!
这样一来,就不会导致内存过载的问题了!但是会降低任务的性能。
总结一句重要的话:所有的Shuffle操作都要落盘!
说完了分组过程中的细节,再来看两者之间的其他地方的一些区别:
reduceByKey相比较于groupByKey优点在于,它可以提前对分区内的数据进行预聚合!(即Combine)
想一想,我们使用reduceByKey最终我们的结果是:(Key,所有相同Key的Value按照设定的计算规则进行聚合的结果)
那什么时候聚合不都是一样吗?!总之不会影响最后的结果!
但是groupByKey呢?就比较苦逼了,只要求他分组,其他的啥也不许干!
那么最终结果就是:reduceByKey在shuffle过程中往文件中写的数据量就小一些,groupByKey要写的数据量就大一些!所以性能的提升就体现出来了~
图示:
看吧,进行了预聚合后在文件写入时数据量大大减少~ 这就是reduceByKey的优化性能的手法
4.2.4.14、aggregateByKey
在学习reduceByKey的时候,我们的算子的参数是一个Value的聚合规则函数~ 无论分区内还是分区间都是使用这个规则!可是当我们遇到分区内和分区间计算规则不同时,reduceByKey就无法满足我们的要求了~
例如:一批数据分区后,分区内求最大值,分区间Value累加(简单说就是将每个分区的最大值进行求和~)
先看案例代码:
object AggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a"->1,"b"->3,"a"->4,"a"->2,"b"->5,"a"->3,"b"->2,"b"->1
),
2
)
val i = rdd.aggregateByKey(0)(
(v1, v2) => Math.max(v1, v2),
_ + _
).collect().foreach(println)
/**
* (b,8)
* (a,7)
*/
// 关闭Spark连接
sc.stop()
}
}
首先我们来讲一下aggregate
算子的参数:很明显参数是柯里化的
- 第一个参数列表:
- zeroValue:分区内计算的初始值~ 第一个元素按照分区内规则计算的话还缺少一个Value,这个初始值就是这个工具人~
- partitioner:分区器
- 第二个参数列表:
- seqOp:分区内数据计算规则~ 这个函数的参数类型和zeroValue的类型是有一点挂钩的!返回值和第一个参数值类型与zeroValue相同~~
- combOp:分区间数据计算规则~ 这个函数的参数类型、返回值类型统统与zeroValue类型相同!
再看回案例代码:
由于我要找到分区中最大值所以,我们zeroValue设置为0!
然后是分区内Value计算规则:(v1, v2) => Math.max(v1, v2)
,找出分区Key形同的中最大Value!
然后是分区间Value计算规则:_ + _
,将相同Key的Value进行求和!
我们用一张图来模拟一下过程:
可以看出最终的结果和我们给出的zeroValue以及分区内、分区间的计算逻辑紧密相关,并且分区数改变也是会改变结果的哦!
例如我们将分区改为4后:分析结果应该为:(a,1+4+3), (b,3+5+2) => (a,8), (b,10)
,运行结果果然不出所料!!
4.2.4.15、foldByKey
它是aggregateByKey的一个简化~ **分区内、分区间的计算逻辑相同!**但是区别于reduceByKey的是它有一个zeroValue!
还是利用上面的数据,我们利用foldByKey,完成reduceByKey的功能:
object FoldByKeyDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a"->1,"b"->3,"a"->4,"a"->2,"b"->5,"a"->3,"b"->2,"b"->1
),
2
)
// 等同于rdd.reduceByKey(_+_)
rdd.foldByKey(0)(
_ + _
).collect().foreach(println)
/**
* (b,11)
* (a,10)
*/
// 关闭Spark连接
sc.stop()
}
}
使用aggregateByKey然后将分区间和分区内计算逻辑写成一样的效果相同!
现在发现一个小问题噢噢噢噢~ 如果将zeroValue定为1, 结果为:
(b,13) (a,12)
如果将分区数设置为4, 结果为:
(a,13) (b,14)
我们一个个分析:
第一种情况:我们将zeroValue设置为1后,因为有两个分区,每个分区都有一个zeroValue,所以经过分区内计算后结果是:(a,8)(b,4)
和(a,4)(b,9)
,然后进行分区间计算得到结果:(a,12)(b,13)
第二个情况:设置为四个分区后,就有了4个zeroValue。但是有两个特殊的分区:没有key=a、没有key=b。所以说这两个分区也就没有(a,1)或(b,1)这俩初始值!所以三个分区的分区内计算结果是:(a,2)(b,4)
、(a,7)
、(a,4)(b,6)
、(b,4)
,最终分区间计算的结果:(a,13)(b,14)!
所以zeroValue的设定对结果的影响一定要提前预估!
插个话题:zeroValue对计算逻辑、计算结果的影响!
我们在介绍aggregateByKey的参数时,就着重强调了zeroValue的类型和两个计算逻辑函数之间的关系。
zeroValue:U
、SeqOp(U,T)=>U
、CombOp:(U,U)=>U
可以看出zeroValue是整个计算的基石!
最终返回的结果类型一定是与zeroValue一致的!而不是以分区数据中的Value类型为主!
foldByKey会自动推导zeroValue的类型~ 但是aggregateByKey不会
案例演示:
object ZeroValueTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a"->1,"b"->3,"a"->4,"a"->2,"b"->5,"a"->3,"b"->2,"b"->1
),
2
)
rdd.aggregateByKey("hello")(
_ + _,
_ + _
).collect().foreach(println)
/**
* (b,hello3hello521)
* (a,hello142hello3)
*/
// 关闭Spark连接
sc.stop()
}
}
数据中Value为Int类型,但是我zeroValue使用String类型并没有报错,计算过程中所有的Value以字符串的形式进行拼接!并且有多个zeroValue在一起证实了我们前面所说的每个分区都有一个独立的zeroValue并且会参与到计算中!
那如果我们反过来,数据的Value用String类型,zeroValue用Int呢?连编译都过不了,因为那两个函数的返回值类型都对不上!
这个规则就给了我们转换数据的结构的机会,我们可以按需要设置zeroValue,最终拿到满意结构的数据!
下面我们就用一个案例来说明:
**利用aggregateByKey求出key相同的Value的平均值!**看到题我们就知道有两件事情要做:
- key出现的次数
- key相同的所有value求和
所以我们的zeroValue应该是这样:(cnt:Int, sum:Int)
然后我们数据的结构由(key:String, value:Int)
变为了(key:String, value:Tuple[Int,Int])
,然后我们对Value的结构进行一下变化即可得到结果!
object AggregateByKeyDemo02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a"->1,"b"->3,"a"->4,"a"->2,"b"->5,"a"->3,"b"->2,"b"->1
),
2
)
// 统计次数,并对value求和
val cntRDD = rdd.aggregateByKey((0, 0))(
(result, value) => {
(result._1 + 1, result._2 + value)
},
(resA, resB) => {
(resA._1 + resB._1, resA._2 + resB._2)
}
)
/**
* (b,(4,11))
* (a,(4,10))
*/
// 对统计结果value进行处理
cntRDD.mapValues(
result => {
// sum / cnt
result._2 * 1.0 / result._1
}
).collect().foreach(println)
/**
* (b,2.75)
* (a,2.5)
*/
// 关闭Spark连接
sc.stop()
}
}
功能都实现了,但是美中不足:我们使用初始值的样子是(a, (0,0))
,感觉就好像从外部引入了一个与分区数据无关的数据,我能不能只用分区中的数据完成呢?把第一个数据(a,n)
变形为(a,(1,n))
,然后进行后续的计算~ 我们自己变形,做初始值感觉不用担心外部引入数据的影响~
但是谁会给你第一个数据变形的机会呢?CombineByKey: 爷来~
4.2.4.16、CombineByKey
上面说了CombineByKey给了第一个数据一次变形成为初始值的机会。
我们来看一下它的参数:(例如我们数据的value都是Int类型)
-
createCombiner:Int=>C
这个就是第一个数据的value变形的函数,返回的C类型就相当于zeroValue,决定了计算结果的结构
-
mergeValue:(C,Int)=>C
: value的合并,等同于上面的分区内计算 -
mergeCombiners:(C,C)=>C
:combiner的合并,等同于上面的分区间计算
话不多说,开干!
object CombineByKeyDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a"->1,"b"->3,"a"->4,"a"->2,"b"->5,"a"->3,"b"->2,"b"->1
),
2
)
// 统计次数,并对value求和
val cntRDD = rdd.combineByKey(
firstValue => (1, firstValue),
// 说明:由于变形后是类型的动态的,无法自动推导,需要手动指定
(res: Tuple2[Int, Int], value) => {
(res._1 + 1, res._2 + value)
},
(resA: Tuple2[Int, Int], resB: Tuple2[Int, Int]) => {
(resA._1 + resB._1, resA._2 + resB._2)
}
)
/**
* (b,(4,11))
* (a,(4,10))
*/
// 对统计结果value进行处理
cntRDD.mapValues(
result => {
// sum / cnt
result._2 * 1.0 / result._1
}
).collect().foreach(println)
/**
* (b,2.75)
* (a,2.5)
*/
// 关闭Spark连接
sc.stop()
}
}
combineByKey的后两个参数函数,因为变形是动态的,无法推断类型需要手动指定Combiner的类型(即参数中类型C需要写明!!)。计算逻辑和aggregateByKey完全一样!!
只是使用一个转换函数替换了zeroValue,现在的初始值是从我们数据中的value变形得来的,而不再是利用外部现成的!
reduceByKey、aggregateByKey、foldByKey、combineByKey共同点
底层都调用combineByKeyWithClassTag
, 并且这个方法由很多重载!
但是几个方法的区别主要集中在这三个参数:
createCombiner: V => C
mergeValue: (C, V) => C
mergeCombiners: (C, C) => C
感兴趣可以点进去,对比看看~~
4.2.4.17、Join
学过MySQL应该不会对这陌生,通常用于联表查询~
这里join
算子的作用是:
将(K,V)类型和(K,U)类型的RDD,将Key相同Value绑定成为二元组(K, (V,U))~(V,U无限制,可以相同)
例如:
rdd1: "a"->1, "b"->3, "c"->6
rdd2: "b"->"B", "a"->"A", "c"->"C"
> rdd1.join(rdd2)
result: (a, (1,"A")), (b, (3,"B")), (c, (6,"C"))
案例演示:
object JoinDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4
),
2
)
val rdd2 = sc.makeRDD(
List(
"a" -> "A", "b" -> "B", "c" -> "C", "d" -> "D"
)
)
rdd.join(rdd2).collect().foreach(println)
/**
* (a,(1,A))
* (b,(2,B))
* (c,(3,C))
* (d,(4,D))
*/
// 关闭Spark连接
sc.stop()
}
}
几个特殊情况:
-
rdd1中有key,而rdd2中没有:不会出现在结果中!
-
rdd1中有重复的key:结果中出现两次,分别于rdd2中的value绑定
>rdd1: "a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4, "a" -> 0 >rdd2: "a" -> "A", "b" -> "B", "c" -> "C", "d" -> "D" (a,(1,A)) (a,(0,A)) (b,(2,B)) (c,(3,C)) (d,(4,D))
-
rdd1和rdd2中同时存在一个key的重复:笛卡尔积(数据几何倍数增长,内存危!慎重!!)
>rdd1: "a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4, "a" -> 0 >rdd2: "a" -> "A", "b" -> "B", "c" -> "C", "d" -> "D", "a" -> "?" (a,(1,A)) (a,(1,?)) (a,(0,A)) (a,(0,?)) (b,(2,B)) (c,(3,C)) (d,(4,D))
可以看出这join算子就是妥妥的内连接!!
4.2.4.18、Left(Right)OuterJoin
左(右)外连接~ 与内连接什么区别,不用说了吧!忘了的去看MySQL笔记吧~~
由于两者使用起来都一样,所以我们选LeftOuterJoin作为演示:
外连接的特点就是:可以主表中将无法连接的值也保留下来
先来试试正常情况:
object LeftOuterJoinDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4
),
2
)
val rdd2 = sc.makeRDD(
List(
"a" -> "A", "b" -> "B", "c" -> "C", "d" -> "D"
)
)
rdd.leftOuterJoin(rdd2).collect().foreach(println)
/**
* (a,(1,Some(A)))
* (b,(2,Some(B)))
* (c,(3,Some(C)))
* (d,(4,Some(D)))
*/
// 关闭Spark连接
sc.stop()
}
}
从结果中看的出来哈,和join还是有点区别的~
首先我们要确定在rdd.leftOuterJoin(rdd2)
这句中,rdd是主表,rdd2是外联的表~ 所以rdd中的所有kv数据都要保留,至于rdd2中有没有对应的key还不清楚,因此最后value绑定的二元组类型是Tuple2[Int, Option]
(注:Option
类是jdk8中加入的解决空值表示的类,Some表示有值,None表示无值,使用Option类在某些情况下可以避免空指针异常)
我们将不确定的rdd2中的Value使用Option类进行包装,rdd中的value是确定的,所以直接使用即可~~
再来看看标准使用情况,rdd中存在key,而rdd2中没有
object LeftOuterJoinDemo02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4
),
2
)
val rdd2 = sc.makeRDD(
List(
"a" -> "A", "b" -> "B", "c" -> "C"
)
)
rdd.leftOuterJoin(rdd2).collect().foreach(println)
/**
* (a,(1,Some(A)))
* (b,(2,Some(B)))
* (c,(3,Some(C)))
* (d,(4,None))
*/
// 关闭Spark连接
sc.stop()
}
}
果然,没有匹配值的,直接绑定None~。
最后一种情况:rdd中没有,而rdd2中有的。
结果:不会出现在结果中,外连接一律以主表为准!!
至于RightOuterJoin,使用起来是一样的,主要是分清楚谁是主表!
(LeftOuterJoin调用者是主表,RightOuterJoin参数是主表!)
当然最终结果绑定value形成的元组,元组汇总数据的顺序也是有规律的_1
是调用者表的value,_2
是参数表的value
4.2.4.19、cogroup
cogroup
= connect + group
分组并连接。怎么个连接法?我们试过就知道了
object CoGroupDemo02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4
),
2
)
val rdd2 = sc.makeRDD(
List(
"a" -> "A", "b" -> "B", "c" -> "C", "d" -> "D"
)
)
rdd.cogroup(rdd2).collect().foreach(println)
/**
* (a,(CompactBuffer(1),CompactBuffer(A)))
* (b,(CompactBuffer(2),CompactBuffer(B)))
* (c,(CompactBuffer(3),CompactBuffer(C)))
* (d,(CompactBuffer(4),CompactBuffer(D)))
*/
// 关闭Spark连接
sc.stop()
}
}
我们从结果,以及collect返回数据类型(String, (Iterable[Int], Iterable[String]))
可以初步看出这个算子的计算过程:
- 每个rdd中数据按key先分组,然后每组将所有value封装成一个集合即
CompactBuffer
。 - 然后多个rdd,在按照key相同分组,然后将各自的value集合打包成一个元组~
改进后测试代码:
object CoGroupDemo02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a" -> 1, "b" -> 2, "c" -> 3, "d" -> 4, "a" -> 5, "c" -> 11
),
2
)
val rdd2 = sc.makeRDD(
List(
"a" -> "A", "b" -> "B", "c" -> "C", "d" -> "D", "a" -> "?", "d" -> "+"
)
)
rdd.cogroup(rdd2).collect().foreach(println)
/**
* (a,(CompactBuffer(1, 5),CompactBuffer(A, ?)))
* (b,(CompactBuffer(2),CompactBuffer(B)))
* (c,(CompactBuffer(3, 11),CompactBuffer(C)))
* (d,(CompactBuffer(4),CompactBuffer(D, +)))
*/
// 关闭Spark连接
sc.stop()
}
}
我们通过cogroup的参数可以看出来,最多可同时进行4个rdd进行分组连接!!
4.2.5、行动算子
前面是我们常使用的转换算子,下面我们学习行动算子。
介绍算子分类的时候说:行动算子会触发任务的执行!
我们最熟知的collect
就是行动算子之一,通过其源码可以看到,方法中(SparkContext)的runJob
方法被调用。
而SparkContext的runJob()
方法底层,又调用了DAGScheduler
(与前面讲到的有向无环图有关)的runJob()
我们继续往下追,发现DAGScheduler的runJob,调用了本类的submitJob()
我们点进去后,发现只是Job的一些准备工作(配置、分区等),这个Job提交,总得有个去处吧~ 于是我们搜索了一下DAGScheduler,发现了一个handlerJobSubmitted
方法:
然后在这个方法中,我们就看到了Job本尊:
一个ActiveJob
对象,激活状态的Job,说明任务开始执行了~~
这一串下来,应该可以初步理解行动算子是如何触发任务执行的了~ 下面我们来学习我们常用的行动算子!
4.2.5.1、reduce
别紧张,其实它和集合中的方法使用相差无几。
功能:对数据按照规则进行聚合
示例:
object ReduceDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
// result:Int
val result = rdd.reduce(_ + _)
/**
* 21
*/
println(result)
// 关闭Spark连接
sc.stop()
}
}
可以看出来,和转换算子最大使用区别在于:
转换算子的返回值依旧是一个RDD,而行动算子返回值直接是一个内存数据
4.2.5.2、collect
这个算子是我们最常用的行动算子,他的功能是:将不同分区的结果数据,按照分区顺序采集到Driver端的内存中!多个分区的数据形成数组!
就不使用代码介绍了
4.2.5.3、count
功能:统计数据源中数据的个数!
object CountDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
// result:Int
val result = rdd.count()
/**
* 6
*/
println(result)
// 关闭Spark连接
sc.stop()
}
}
4.2.5.4、first
见名知意,取出数据源中第一个数据
object FirstDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
val result = rdd.first()
/**
* 1
*/
println(result)
// 关闭Spark连接
sc.stop()
}
}
4.2.5.5、take、takeOrdered
take
:取出前n个数据,并转为数组
object TakeDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 4, 2, 5, 3, 6),2)
val result = rdd.take(2)
println(result.mkString(","))
/**
* 1,4
*/
// 关闭Spark连接
sc.stop()
}
}
takeOrdered
:取出排序后的前n个数据
println(rdd.takeOrdered(2))
/**
* 1,2
*/
默认是升序排序,第二个参数可以指定排序方式。有需要的可以自行百度~
4.2.5.6、aggregate
功能和参数和前面的aggregateByKey
相同,只不过aggregate
算子,直接得出结果。
object AggregateDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(3, 1, 4, 2, 6, 5),2)
// 分区内求最大值,分区间结果累加
val result = rdd.aggregate(0)(
math.max(_, _),
_ + _
)
println(result)
/**
* 10
*/
// 关闭Spark连接
sc.stop()
}
}
有一个关于zeroValue参与计算的问题:
val rdd = sc.makeRDD(List(3, 1, 4, 2, 6, 5),2)
// 分区内求最大值,分区间结果累加
val result = rdd.aggregate(1)(
_ + _,
_ + _
)
println(result)
猜猜看结果:按照前面学习的进行分析,结果是:9+14=23,但是最终的结果是:24!!
这是由于zeroValue不仅参与了分区内的计算,并且后面分区间的合并计算zeroValue也参与了~ 所以相较于源数据多了三个zeroValue~~
但是前面aggregateByKey不会出现这种情况~
4.2.5.6、fold
aggregate的简化版本,使用请参考aggregate~
同样zeroValue也会参与分区间的计算!!
4.2.5.7、countByKey、countByValue
按照key或者value统计元素个数~
countByValue:
object CountValueDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
1, 3, 4, 2, 1
),
2
)
val result = rdd.countByValue()
println(result)
/**
* Map(4 -> 1, 2 -> 1, 1 -> 2, 3 -> 1)
*/
// 关闭Spark连接
sc.stop()
}
}
结果表示,value=4、2、3出现了一次,value=1出现了两次。
countByKey:
object CountByKeyDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
"a" -> 1, "b" -> 2, "c" -> 3, "a" -> 4
),
2
)
val result = rdd.countByKey()
println(result)
/**
* Map(b -> 1, a -> 2, c -> 1)
*/
// 关闭Spark连接
sc.stop()
}
}
结果是什么意思,懂得都懂。注意!countByKey只使用与kv类型的数据!而countByValue对于kv类型数据是将kv整体视为value!
4.2.5.8、save
我们最常用的saveAsTextFile
也是行动算子,
相类似的还有:saveAsObjectFile
、saveAsHadoopFile
、saveAsSequenceFile
等。
都是将分区数据以文件的形式输出~
4.2.5.9、foreach
回看一下,我们在学习RDD创建时,尝试将内容输出时,遇到了一个小疑问:我们使用collect后然后使用foreach遍历数据 和 直接使用foreach遍历数据最终得到的结果是不一样的!并且!后者遍历的结果每次都不相同!!
现在就来解开疑问:它们俩根本就不是一个东西!
一个是行动算子,而另一个是集合的方法~
并且导致第二种方式执行结果不稳定的原因是:foreach算子的执行完全由每个Executor负责完成!而foreach方法调用是在Driver中,对其内存中的集合进行遍历
使用了collect方法,会按照分区顺序采集数据到Driver内存中形成集合,这样数据是稳定的,而每个Executor在执行foreach算子的时候,顺序和速度我们无法干预,所以就有了执行结果次次不同的原因!!
object ForeachDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]").setAppName("CreateRDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6),2)
println("=====内存数组遍历=======")
rdd.collect().foreach(println)
println("=====foreach算子=======")
rdd.foreach(println)
/**
* =====内存数组遍历=======
* 1
* 2
* 3
* 4
* 5
* 6
* =====foreach算子=======
* 4
* 1
* 5
* 2
* 6
* 3
*/
// 关闭Spark连接
sc.stop()
}
}
4.2.6、算子小结
再来说一下算子,我们为什么称其为算子?!
是为了和Scala中的集合操作方法进行区别!
算子有个很大的特点:所有的转换算子,在调用的时候并不会立刻执行!直到行动算子触发了任务提交运行,算子内的代码才会运行!(并且算子的运行位置是有讲究的!)
所有算子内的代码,都是发送到Executor中执行的,所有算子外的代码都是在Driver端内存中执行的!
4.2.7、闭包检查和序列化
前置知识点:Scala闭包,Java序列化
Scala闭包
在Scala中,若函数用到了函数外的变量,那么这些变量和函数共同形成一个封闭的整体,我们称其为闭包!
Java序列化
在Java.net章节以及IO章节中,我们知道一个对象在网络中进行传输,是需要首先将对象转换为为字节数组,这个过程我们称为序列化; 传输完成后,我们将字节数组还原成对象,此过程称为反序列化
你可能感觉到我要说什么了~ 但是有说不清他们之间有什么关系,我们先来看一段代码吧:
object TestSerialize {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]").setAppName("TestSerialize")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List("Hello", "World", "Spark", "Scala"), 1)
val counter = new Counter
rdd.map(
str => {
counter.increase()
println(str + "=>" + counter.cnt)
}
).collect()
println("数据条数:" + counter.cnt)
sc.stop()
}
/**
* 计数器类
*/
class Counter {
var cnt:Int = _
def increase(): Unit = {
cnt += 1
}
}
}
你觉得这段代码的运行结果是什么? 实际结果是:
好家伙!我直接好家伙!
主要的问题是:Task not serializable,核心问题则是我们的Counter类没有序列化。
为什么没有序列化呢?!需要继承serializable接口。那Scala也是一样!
trait Serializable extends Any with java.io.Serializable
Scala的序列化特质,直接使用的java.io中的序列化接口,这就是我们常在Java中使用的序列化接口~
好,我们给Count混入序列化接口后,程序可以正常运行啦~~
另外一种解决方案:使用样例类,(case class
)默认混入了序列化接口!!
现在来解释一下为什么会报序列化相关的错误:
再说一遍:算子以内的代码由Executor执行,算子以外的代码在Driver端内存中执行!
分析代码就知道:
counter的创建是在Driver中,counter对象存在于Driver端的内存中!但是对counter的调用在Executor端。
这就出现了问题:
- map算子中的匿名函数调用了外部的counter对象,counter对象和此匿名函数形成闭包!!
- 为了Executor顺利执行算子中的代码,所以会将counter对象连同RDD发送到Executor端!
- Job的闭包检测发现,要发送到Executor端的Counter对象无法序列化,强制叫停了任务,并抛出了异常!
由于算子内外的代码的执行位置不同,所以说算子有时难以避免使用外部的变量,所以与算子中匿名函数形成闭包的类一定要混入序列化接口!!
都说到这里了,我们来看下源码中的闭包检查以及序列化检查:
在多层重载runJob的调用过程中,cleanFunc = clean(func)
就是在对我们的算子中的匿名函数做闭包检查和清理。
然后调用了ClosureCleaner的clean方法:
最后调用真实的clean方法,对匿名函数进行闭包清理以及序列化检查!
好的,我们回到案例代码中看一下运行结果,你会感到匪夷所思的!
Hello=>1
World=>2
Spark=>3
Scala=>4
数据条数:0
我们的数据只有一个分区,所以也就只有一个Executor。我们将分区设置为两个后的结果:
Hello=>1
World=>2
Spark=>1
Scala=>2
数据条数:0
悟了嘛?!反正我懂了大概是怎么回事了!无奈无法调试,我们只能猜想:
画图不太好,将就看吧~
由于传输过程是在节点和节点之间的(即便程序运行是单机的,但是模拟实际生产多节点~)所以不可能在Executor节点中执行的代码改变Driver内存中的对象! 所以我猜传输到Executor节点的对象是Driver中原对象的一个复制品!有多少个分区就有多少个复制品!
我们的count.increase()在Executor节点上执行,改变的只是复制品的cnt,代码中最终println是算子外的代码,是在Driver端执行的!所以取值的也只能是Driver端的counter对象,理所当然就是0啦!
再一次说明了算子内的代码发送到Executor端执行,算子外的代码在Driver端执行!
4.2.8、闭包检查(二)
我们再来看一段代码:
object TestSerialize02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]").setAppName("TestSerialize")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List("Hello", "World", "Spark", "Scala"), 1)
val searcher = new Searcher("S")
val resultRDD = searcher.search(rdd)
resultRDD.collect().foreach(println)
sc.stop()
}
/**
* 检索类
* @param target 检索目标
*/
class Searcher(target: String) {
def search(rdd: RDD[String]): RDD[String] ={
rdd.filter(_.contains(target))
}
}
}
还是一样报错了!也是说Searcher无法进行序列化!
可以发现我们仅使用的转换算子filter内的代码只有:
_.contains(target)
这个匿名函数使用的外面变量是一个String类型的Searcher构造参数target~ 按理说就算target和这个匿名函数形成闭包,String默认就实现了序列化接口啊!怎么会扯到Searcher类呢?!
原因:构造参数也是类的一部分!是类的属性!依托于实例对象!
你以为只用到了一个String变量,实际你是通过类对象进行调用的,理应将对象进行序列化!
那么有什么手段可以让我们避免出现这种情况呢?!可不可以不序列化对象?
我们从闭包的定义开始入手:当函数内使用了外部变量会形成闭包!所以我们要保证算子内使用的变量是与类剥离开的!并且这个变量可以序列化
修改后:
class Searcher(target: String) {
def search(rdd: RDD[String]): RDD[String] ={
var ch = target
rdd.filter(_.contains(ch))
}
}
妙啊! 可以看到我们在target和匿名函数中间加上了一个ch变量! 这个变量并不依托于对象,即便是与匿名函数形成了闭包,String自带序列化。我们就省去了Searcher对象的序列化!
要好好理解闭包的形成,要正确分析出代码的执行位置到底是在Driver还是Executor!
conf.setMaster(“local[*]”).setAppName(“TestSerialize”)
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List("Hello", "World", "Spark", "Scala"), 1)
val counter = new Counter
rdd.map(
str => {
counter.increase()
println(str + "=>" + counter.cnt)
}
).collect()
println("数据条数:" + counter.cnt)
sc.stop()
}
/**
- 计数器类
*/
class Counter {
var cnt:Int = _
def increase(): Unit = {
cnt += 1
}
}
}
你觉得这段代码的运行结果是什么? 实际结果是:
[外链图片转存中...(img-1DlUVGY2-1610552910752)]
好家伙!我直接好家伙!
**主要的问题是:Task not serializable,核心问题则是我们的Counter类没有序列化。**
为什么没有序列化呢?!需要继承serializable接口。那Scala也是一样!
```scala
trait Serializable extends Any with java.io.Serializable
Scala的序列化特质,直接使用的java.io中的序列化接口,这就是我们常在Java中使用的序列化接口~
好,我们给Count混入序列化接口后,程序可以正常运行啦~~
另外一种解决方案:使用样例类,(case class
)默认混入了序列化接口!!
现在来解释一下为什么会报序列化相关的错误:
再说一遍:算子以内的代码由Executor执行,算子以外的代码在Driver端内存中执行!
分析代码就知道:
counter的创建是在Driver中,counter对象存在于Driver端的内存中!但是对counter的调用在Executor端。
这就出现了问题:
- map算子中的匿名函数调用了外部的counter对象,counter对象和此匿名函数形成闭包!!
- 为了Executor顺利执行算子中的代码,所以会将counter对象连同RDD发送到Executor端!
- Job的闭包检测发现,要发送到Executor端的Counter对象无法序列化,强制叫停了任务,并抛出了异常!
由于算子内外的代码的执行位置不同,所以说算子有时难以避免使用外部的变量,所以与算子中匿名函数形成闭包的类一定要混入序列化接口!!
都说到这里了,我们来看下源码中的闭包检查以及序列化检查:
在多层重载runJob的调用过程中,cleanFunc = clean(func)
就是在对我们的算子中的匿名函数做闭包检查和清理。
然后调用了ClosureCleaner的clean方法:
最后调用真实的clean方法,对匿名函数进行闭包清理以及序列化检查!
好的,我们回到案例代码中看一下运行结果,你会感到匪夷所思的!
Hello=>1
World=>2
Spark=>3
Scala=>4
数据条数:0
我们的数据只有一个分区,所以也就只有一个Executor。我们将分区设置为两个后的结果:
Hello=>1
World=>2
Spark=>1
Scala=>2
数据条数:0
悟了嘛?!反正我懂了大概是怎么回事了!无奈无法调试,我们只能猜想:
画图不太好,将就看吧~
由于传输过程是在节点和节点之间的(即便程序运行是单机的,但是模拟实际生产多节点~)所以不可能在Executor节点中执行的代码改变Driver内存中的对象! 所以我猜传输到Executor节点的对象是Driver中原对象的一个复制品!有多少个分区就有多少个复制品!
我们的count.increase()在Executor节点上执行,改变的只是复制品的cnt,代码中最终println是算子外的代码,是在Driver端执行的!所以取值的也只能是Driver端的counter对象,理所当然就是0啦!
再一次说明了算子内的代码发送到Executor端执行,算子外的代码在Driver端执行!
4.2.8、闭包检查(二)
我们再来看一段代码:
object TestSerialize02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]").setAppName("TestSerialize")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List("Hello", "World", "Spark", "Scala"), 1)
val searcher = new Searcher("S")
val resultRDD = searcher.search(rdd)
resultRDD.collect().foreach(println)
sc.stop()
}
/**
* 检索类
* @param target 检索目标
*/
class Searcher(target: String) {
def search(rdd: RDD[String]): RDD[String] ={
rdd.filter(_.contains(target))
}
}
}
还是一样报错了!也是说Searcher无法进行序列化!
可以发现我们仅使用的转换算子filter内的代码只有:
_.contains(target)
这个匿名函数使用的外面变量是一个String类型的Searcher构造参数target~ 按理说就算target和这个匿名函数形成闭包,String默认就实现了序列化接口啊!怎么会扯到Searcher类呢?!
原因:构造参数也是类的一部分!是类的属性!依托于实例对象!
你以为只用到了一个String变量,实际你是通过类对象进行调用的,理应将对象进行序列化!
那么有什么手段可以让我们避免出现这种情况呢?!可不可以不序列化对象?
我们从闭包的定义开始入手:当函数内使用了外部变量会形成闭包!所以我们要保证算子内使用的变量是与类剥离开的!并且这个变量可以序列化
修改后:
class Searcher(target: String) {
def search(rdd: RDD[String]): RDD[String] ={
var ch = target
rdd.filter(_.contains(ch))
}
}
妙啊! 可以看到我们在target和匿名函数中间加上了一个ch变量! 这个变量并不依托于对象,即便是与匿名函数形成了闭包,String自带序列化。我们就省去了Searcher对象的序列化!
要好好理解闭包的形成,要正确分析出代码的执行位置到底是在Driver还是Executor!